1use std::cmp::Ordering;
2use std::collections::BTreeMap;
3
4use hyper_util::rt::TokioIo;
5use tokio::sync::mpsc;
6use tokio_stream::wrappers::ReceiverStream;
7use tonic::Request;
8use tonic::codegen::async_trait;
9use tonic::metadata::MetadataValue;
10use tonic::service::Interceptor;
11use tonic::service::interceptor::InterceptedService;
12use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
13use tower::service_fn;
14
15use crate::env::{ENV_HOST_SERVICE_SOCKET, ENV_HOST_SERVICE_TOKEN, HOST_SERVICE_BINDING_HEADER};
16use crate::generated::v1::{self as pb, indexed_db_client::IndexedDbClient};
17
18type IndexedDbTransport = InterceptedService<Channel, RelayTokenInterceptor>;
19
20const INDEXEDDB_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
21const CURSOR_CHANNEL_BUFFER: usize = 1;
22const TRANSACTION_CHANNEL_BUFFER: usize = 1;
23
24#[derive(Debug, thiserror::Error)]
25pub enum IndexedDBError {
27 #[error("not found")]
29 NotFound,
30 #[error("already exists")]
32 AlreadyExists,
33 #[error("cursor is keys-only; value not available")]
35 KeysOnly,
36 #[error("{0}")]
38 InvalidArgument(String),
39 #[error("{0}")]
41 Transaction(String),
42 #[error("{0}")]
44 Transport(#[from] tonic::transport::Error),
45 #[error("{0}")]
47 Status(#[from] tonic::Status),
48 #[error("{0}")]
50 Env(String),
51}
52
53pub type Record = BTreeMap<String, serde_json::Value>;
55
56#[derive(Debug, Clone, PartialEq)]
58pub struct KeyRange {
59 pub lower: Option<serde_json::Value>,
61 pub upper: Option<serde_json::Value>,
63 pub lower_open: bool,
65 pub upper_open: bool,
67}
68
69#[derive(Debug, Clone, PartialEq)]
71pub struct IndexSchema {
72 pub name: String,
74 pub key_path: Vec<String>,
76 pub unique: bool,
78}
79
80#[derive(Debug, Clone, PartialEq)]
82pub struct ObjectStoreSchema {
83 pub indexes: Vec<IndexSchema>,
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum CursorDirection {
90 Next,
92 NextUnique,
94 Prev,
96 PrevUnique,
98}
99
100impl CursorDirection {
101 fn to_proto(self) -> i32 {
102 match self {
103 Self::Next => pb::CursorDirection::CursorNext as i32,
104 Self::NextUnique => pb::CursorDirection::CursorNextUnique as i32,
105 Self::Prev => pb::CursorDirection::CursorPrev as i32,
106 Self::PrevUnique => pb::CursorDirection::CursorPrevUnique as i32,
107 }
108 }
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum TransactionMode {
114 Readonly,
116 Readwrite,
118}
119
120impl TransactionMode {
121 fn to_proto(self) -> i32 {
122 match self {
123 Self::Readonly => pb::TransactionMode::TransactionReadonly as i32,
124 Self::Readwrite => pb::TransactionMode::TransactionReadwrite as i32,
125 }
126 }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
130pub enum TransactionDurabilityHint {
132 #[default]
134 Default,
135 Strict,
137 Relaxed,
139}
140
141impl TransactionDurabilityHint {
142 fn to_proto(self) -> i32 {
143 match self {
144 Self::Default => pb::TransactionDurabilityHint::TransactionDurabilityDefault as i32,
145 Self::Strict => pb::TransactionDurabilityHint::TransactionDurabilityStrict as i32,
146 Self::Relaxed => pb::TransactionDurabilityHint::TransactionDurabilityRelaxed as i32,
147 }
148 }
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
152pub struct TransactionOptions {
154 pub durability_hint: TransactionDurabilityHint,
156}
157
158#[async_trait]
159pub trait IndexedDBApi: Send {
161 type ObjectStore: ObjectStoreApi;
163 type Transaction: TransactionApi;
165
166 async fn create_object_store(
168 &mut self,
169 name: &str,
170 schema: ObjectStoreSchema,
171 ) -> Result<Self::ObjectStore, IndexedDBError>;
172
173 async fn delete_object_store(&mut self, name: &str) -> Result<(), IndexedDBError>;
175
176 fn object_store(&self, name: &str) -> Self::ObjectStore;
178
179 async fn transaction(
181 &self,
182 stores: &[&str],
183 mode: TransactionMode,
184 options: TransactionOptions,
185 ) -> Result<Self::Transaction, IndexedDBError>;
186}
187
188#[async_trait]
189pub trait ObjectStoreApi: Send {
191 type Index: IndexApi;
193 type Cursor: CursorApi;
195
196 async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError>;
198
199 async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError>;
201
202 async fn add(&mut self, record: Record) -> Result<(), IndexedDBError>;
204
205 async fn put(&mut self, record: Record) -> Result<(), IndexedDBError>;
207
208 async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError>;
210
211 async fn clear(&mut self) -> Result<(), IndexedDBError>;
213
214 async fn get_all(&mut self, range: Option<KeyRange>) -> Result<Vec<Record>, IndexedDBError>;
216
217 async fn get_all_keys(
219 &mut self,
220 range: Option<KeyRange>,
221 ) -> Result<Vec<String>, IndexedDBError>;
222
223 async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError>;
225
226 async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError>;
228
229 fn index(&self, name: &str) -> Self::Index;
231
232 async fn open_cursor(
234 &mut self,
235 range: Option<KeyRange>,
236 direction: CursorDirection,
237 ) -> Result<Self::Cursor, IndexedDBError>;
238
239 async fn open_key_cursor(
241 &mut self,
242 range: Option<KeyRange>,
243 direction: CursorDirection,
244 ) -> Result<Self::Cursor, IndexedDBError>;
245}
246
247#[async_trait]
248pub trait IndexApi: Send {
250 type Cursor: CursorApi;
252
253 async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError>;
255
256 async fn get_key(&mut self, values: &[serde_json::Value]) -> Result<String, IndexedDBError>;
258
259 async fn get_all(
261 &mut self,
262 values: &[serde_json::Value],
263 range: Option<KeyRange>,
264 ) -> Result<Vec<Record>, IndexedDBError>;
265
266 async fn get_all_keys(
268 &mut self,
269 values: &[serde_json::Value],
270 range: Option<KeyRange>,
271 ) -> Result<Vec<String>, IndexedDBError>;
272
273 async fn count(
275 &mut self,
276 values: &[serde_json::Value],
277 range: Option<KeyRange>,
278 ) -> Result<i64, IndexedDBError>;
279
280 async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError>;
282
283 async fn delete_range(
285 &mut self,
286 values: &[serde_json::Value],
287 range: KeyRange,
288 ) -> Result<i64, IndexedDBError>;
289
290 async fn open_cursor(
292 &mut self,
293 values: &[serde_json::Value],
294 range: Option<KeyRange>,
295 direction: CursorDirection,
296 ) -> Result<Self::Cursor, IndexedDBError>;
297
298 async fn open_key_cursor(
300 &mut self,
301 values: &[serde_json::Value],
302 range: Option<KeyRange>,
303 direction: CursorDirection,
304 ) -> Result<Self::Cursor, IndexedDBError>;
305}
306
307#[async_trait]
308pub trait TransactionApi: Send {
310 type ObjectStore<'a>: TransactionObjectStoreApi + 'a
312 where
313 Self: 'a;
314
315 fn object_store<'a>(&'a mut self, name: &str) -> Self::ObjectStore<'a>;
317
318 async fn commit(&mut self) -> Result<(), IndexedDBError>;
320
321 async fn abort(&mut self, reason: &str) -> Result<(), IndexedDBError>;
323}
324
325#[async_trait]
326pub trait TransactionObjectStoreApi: Send {
328 type Index<'a>: TransactionIndexApi + 'a
330 where
331 Self: 'a;
332
333 async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError>;
335
336 async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError>;
338
339 async fn add(&mut self, record: Record) -> Result<(), IndexedDBError>;
341
342 async fn put(&mut self, record: Record) -> Result<(), IndexedDBError>;
344
345 async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError>;
347
348 async fn clear(&mut self) -> Result<(), IndexedDBError>;
350
351 async fn get_all(&mut self, range: Option<KeyRange>) -> Result<Vec<Record>, IndexedDBError>;
353
354 async fn get_all_keys(
356 &mut self,
357 range: Option<KeyRange>,
358 ) -> Result<Vec<String>, IndexedDBError>;
359
360 async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError>;
362
363 async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError>;
365
366 fn index<'a>(&'a mut self, name: &str) -> Self::Index<'a>;
368}
369
370#[async_trait]
371pub trait TransactionIndexApi: Send {
373 async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError>;
375
376 async fn get_key(&mut self, values: &[serde_json::Value]) -> Result<String, IndexedDBError>;
378
379 async fn get_all(
381 &mut self,
382 values: &[serde_json::Value],
383 range: Option<KeyRange>,
384 ) -> Result<Vec<Record>, IndexedDBError>;
385
386 async fn get_all_keys(
388 &mut self,
389 values: &[serde_json::Value],
390 range: Option<KeyRange>,
391 ) -> Result<Vec<String>, IndexedDBError>;
392
393 async fn count(
395 &mut self,
396 values: &[serde_json::Value],
397 range: Option<KeyRange>,
398 ) -> Result<i64, IndexedDBError>;
399
400 async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError>;
402
403 async fn delete_range(
405 &mut self,
406 values: &[serde_json::Value],
407 range: KeyRange,
408 ) -> Result<i64, IndexedDBError>;
409}
410
411#[async_trait]
412pub trait CursorApi: Send {
414 fn key(&self) -> Option<serde_json::Value>;
416
417 fn primary_key(&self) -> &str;
419
420 fn value(&self) -> Result<Record, IndexedDBError>;
422
423 async fn continue_next(&mut self) -> Result<bool, IndexedDBError>;
425
426 async fn continue_to_key(&mut self, key: serde_json::Value) -> Result<bool, IndexedDBError>;
428
429 async fn advance(&mut self, count: i32) -> Result<bool, IndexedDBError>;
431
432 async fn delete(&mut self) -> Result<(), IndexedDBError>;
434
435 async fn update(&mut self, value: Record) -> Result<(), IndexedDBError>;
437
438 async fn close(self) -> Result<(), IndexedDBError>
440 where
441 Self: Sized;
442}
443
444#[derive(Debug, Clone)]
446pub struct IndexedDBOpenCursorRequest {
447 pub store: String,
449 pub range: Option<KeyRange>,
451 pub direction: CursorDirection,
453 pub keys_only: bool,
455 pub index: String,
457 pub values: Vec<serde_json::Value>,
459}
460
461impl Default for IndexedDBOpenCursorRequest {
462 fn default() -> Self {
463 Self {
464 store: String::new(),
465 range: None,
466 direction: CursorDirection::Next,
467 keys_only: false,
468 index: String::new(),
469 values: Vec::new(),
470 }
471 }
472}
473
474#[derive(Debug, Clone, PartialEq)]
476pub struct IndexedDBCursorSnapshotEntry {
477 pub key: serde_json::Value,
479 pub primary_key: String,
481 pub primary_key_value: serde_json::Value,
483 pub record: Record,
485}
486
487#[derive(Debug, Clone)]
493pub struct IndexedDBCursorSnapshot {
494 pub index_cursor: bool,
496 pub keys_only: bool,
498 pub reverse: bool,
500 pub unique: bool,
502 pub entries: Vec<IndexedDBCursorSnapshotEntry>,
504 pub pos: isize,
506}
507
508impl IndexedDBCursorSnapshot {
509 pub fn new(req: &IndexedDBOpenCursorRequest) -> Self {
511 Self {
512 index_cursor: !req.index.is_empty(),
513 keys_only: req.keys_only,
514 reverse: matches!(
515 req.direction,
516 CursorDirection::Prev | CursorDirection::PrevUnique
517 ),
518 unique: matches!(
519 req.direction,
520 CursorDirection::NextUnique | CursorDirection::PrevUnique
521 ),
522 entries: Vec::new(),
523 pos: -1,
524 }
525 }
526
527 pub fn load(
529 &mut self,
530 mut entries: Vec<IndexedDBCursorSnapshotEntry>,
531 range: Option<&KeyRange>,
532 ) -> Result<(), IndexedDBError> {
533 entries.sort_by(|left, right| {
534 let mut cmp = compare_indexeddb_values(&left.key, &right.key);
535 if cmp == Ordering::Equal {
536 cmp = compare_indexeddb_values(&left.primary_key_value, &right.primary_key_value);
537 }
538 if self.reverse { cmp.reverse() } else { cmp }
539 });
540 self.entries = self.apply_range(entries, range)?;
541 self.pos = -1;
542 Ok(())
543 }
544
545 pub fn apply_range(
547 &self,
548 entries: Vec<IndexedDBCursorSnapshotEntry>,
549 range: Option<&KeyRange>,
550 ) -> Result<Vec<IndexedDBCursorSnapshotEntry>, IndexedDBError> {
551 let Some(range) = range else {
552 return Ok(entries);
553 };
554 let (lower, upper) = indexeddb_range_bounds(Some(range), self.index_cursor);
555 let mut filtered = Vec::with_capacity(entries.len());
556 for entry in entries {
557 let key = normalize_indexeddb_bound(&entry.key, self.index_cursor);
558 if let Some(lower) = &lower {
559 let cmp = compare_indexeddb_values(&key, lower);
560 if range.lower_open && cmp != Ordering::Greater {
561 continue;
562 }
563 if !range.lower_open && cmp == Ordering::Less {
564 continue;
565 }
566 }
567 if let Some(upper) = &upper {
568 let cmp = compare_indexeddb_values(&key, upper);
569 if range.upper_open && cmp != Ordering::Less {
570 continue;
571 }
572 if !range.upper_open && cmp == Ordering::Greater {
573 continue;
574 }
575 }
576 filtered.push(entry);
577 }
578 Ok(filtered)
579 }
580
581 #[allow(clippy::should_implement_trait)]
583 pub fn next(&mut self) -> Result<Option<&IndexedDBCursorSnapshotEntry>, IndexedDBError> {
584 if self.unique
585 && self.index_cursor
586 && self.pos >= 0
587 && (self.pos as usize) < self.entries.len()
588 {
589 let previous = self.entries[self.pos as usize].key.clone();
590 self.pos += 1;
591 while (self.pos as usize) < self.entries.len() {
592 if compare_indexeddb_values(&self.entries[self.pos as usize].key, &previous)
593 != Ordering::Equal
594 {
595 return Ok(Some(self.current()?));
596 }
597 self.pos += 1;
598 }
599 return Ok(None);
600 }
601
602 self.pos += 1;
603 if (self.pos as usize) >= self.entries.len() {
604 return Ok(None);
605 }
606 Ok(Some(self.current()?))
607 }
608
609 pub fn continue_to_key(
611 &mut self,
612 target: &serde_json::Value,
613 ) -> Result<Option<&IndexedDBCursorSnapshotEntry>, IndexedDBError> {
614 let previous = if self.unique
615 && self.index_cursor
616 && self.pos >= 0
617 && (self.pos as usize) < self.entries.len()
618 {
619 Some(self.entries[self.pos as usize].key.clone())
620 } else {
621 None
622 };
623 self.pos += 1;
624 while (self.pos as usize) < self.entries.len() {
625 let current = &self.entries[self.pos as usize].key;
626 if let Some(previous) = &previous {
627 if self.unique
628 && self.index_cursor
629 && compare_indexeddb_values(current, previous) == Ordering::Equal
630 {
631 self.pos += 1;
632 continue;
633 }
634 }
635 let cmp = compare_indexeddb_values(current, target);
636 if self.reverse {
637 if cmp != Ordering::Greater {
638 return Ok(Some(self.current()?));
639 }
640 } else if cmp != Ordering::Less {
641 return Ok(Some(self.current()?));
642 }
643 self.pos += 1;
644 }
645 Ok(None)
646 }
647
648 pub fn advance(
650 &mut self,
651 count: i32,
652 ) -> Result<Option<&IndexedDBCursorSnapshotEntry>, IndexedDBError> {
653 if count <= 0 {
654 return Err(IndexedDBError::InvalidArgument(
655 "advance count must be positive".to_string(),
656 ));
657 }
658 for i in 0..count {
659 if self.next()?.is_none() {
660 return Ok(None);
661 }
662 if i == count - 1 {
663 return Ok(Some(self.current()?));
664 }
665 }
666 Ok(None)
667 }
668
669 pub fn current(&self) -> Result<&IndexedDBCursorSnapshotEntry, IndexedDBError> {
671 if self.pos < 0 || (self.pos as usize) >= self.entries.len() {
672 return Err(IndexedDBError::NotFound);
673 }
674 Ok(&self.entries[self.pos as usize])
675 }
676}
677
678pub fn new_indexeddb_cursor_snapshot(req: &IndexedDBOpenCursorRequest) -> IndexedDBCursorSnapshot {
680 IndexedDBCursorSnapshot::new(req)
681}
682
683pub fn indexeddb_range_bounds(
688 range: Option<&KeyRange>,
689 index_cursor: bool,
690) -> (Option<serde_json::Value>, Option<serde_json::Value>) {
691 let Some(range) = range else {
692 return (None, None);
693 };
694 let lower = range
695 .lower
696 .as_ref()
697 .map(|value| normalize_indexeddb_bound(value, index_cursor));
698 let upper = range
699 .upper
700 .as_ref()
701 .map(|value| normalize_indexeddb_bound(value, index_cursor));
702 (lower, upper)
703}
704
705pub fn compare_indexeddb_values(left: &serde_json::Value, right: &serde_json::Value) -> Ordering {
707 match (left, right) {
708 (serde_json::Value::Array(left), serde_json::Value::Array(right)) => {
709 for (i, left_value) in left.iter().enumerate() {
710 let Some(right_value) = right.get(i) else {
711 return Ordering::Greater;
712 };
713 let cmp = compare_indexeddb_values(left_value, right_value);
714 if cmp != Ordering::Equal {
715 return cmp;
716 }
717 }
718 left.len().cmp(&right.len())
719 }
720 (serde_json::Value::String(left), serde_json::Value::String(right)) => left.cmp(right),
721 (serde_json::Value::Bool(left), serde_json::Value::Bool(right)) => left.cmp(right),
722 (serde_json::Value::Number(left), serde_json::Value::Number(right)) => {
723 compare_json_numbers(left, right)
724 }
725 _ => left.to_string().cmp(&right.to_string()),
726 }
727}
728
729fn normalize_indexeddb_bound(value: &serde_json::Value, index_cursor: bool) -> serde_json::Value {
730 if !index_cursor {
731 return value.clone();
732 }
733 if matches!(value, serde_json::Value::Array(_)) {
734 return value.clone();
735 }
736 serde_json::Value::Array(vec![value.clone()])
737}
738
739fn compare_json_numbers(left: &serde_json::Number, right: &serde_json::Number) -> Ordering {
740 if let (Some(left), Some(right)) = (left.as_i64(), right.as_i64()) {
741 return left.cmp(&right);
742 }
743 if let (Some(left), Some(right)) = (left.as_u64(), right.as_u64()) {
744 return left.cmp(&right);
745 }
746 if let (Some(left), Some(right)) = (left.as_i64(), right.as_u64()) {
747 if left < 0 {
748 return Ordering::Less;
749 }
750 return (left as u64).cmp(&right);
751 }
752 if let (Some(left), Some(right)) = (left.as_u64(), right.as_i64()) {
753 if right < 0 {
754 return Ordering::Greater;
755 }
756 return left.cmp(&(right as u64));
757 }
758 match (left.as_f64(), right.as_f64()) {
759 (Some(left), Some(right)) => left.partial_cmp(&right).unwrap_or(Ordering::Equal),
760 _ => left.to_string().cmp(&right.to_string()),
761 }
762}
763
764#[cfg(test)]
765mod tests {
766 use super::*;
767 use serde_json::json;
768
769 fn entry(
770 key: serde_json::Value,
771 primary_key: &str,
772 primary_key_value: serde_json::Value,
773 ) -> IndexedDBCursorSnapshotEntry {
774 IndexedDBCursorSnapshotEntry {
775 key,
776 primary_key: primary_key.to_string(),
777 primary_key_value,
778 record: Record::new(),
779 }
780 }
781
782 #[test]
783 fn cursor_snapshot_sorts_ranges_and_skips_duplicate_unique_index_keys() {
784 let mut snapshot = new_indexeddb_cursor_snapshot(&IndexedDBOpenCursorRequest {
785 direction: CursorDirection::NextUnique,
786 index: "by_status".to_string(),
787 ..Default::default()
788 });
789
790 snapshot
791 .load(
792 vec![
793 entry(json!(["todo"]), "issue-2", json!("issue-2")),
794 entry(json!(["done"]), "issue-3", json!("issue-3")),
795 entry(json!(["todo"]), "issue-1", json!("issue-1")),
796 ],
797 Some(&KeyRange {
798 lower: Some(json!(["done"])),
799 upper: Some(json!(["todo"])),
800 lower_open: false,
801 upper_open: false,
802 }),
803 )
804 .expect("load");
805
806 assert_eq!(
807 snapshot.next().expect("first").unwrap().primary_key,
808 "issue-3"
809 );
810 assert_eq!(
811 snapshot.next().expect("second").unwrap().primary_key,
812 "issue-1"
813 );
814 assert!(snapshot.next().expect("exhausted").is_none());
815 }
816
817 #[test]
818 fn cursor_snapshot_advance_moves_exactly_count_entries_from_current_position() {
819 let mut snapshot = new_indexeddb_cursor_snapshot(&IndexedDBOpenCursorRequest::default());
820 snapshot
821 .load(
822 vec![
823 entry(json!("a"), "a", json!("a")),
824 entry(json!("b"), "b", json!("b")),
825 entry(json!("c"), "c", json!("c")),
826 ],
827 None,
828 )
829 .expect("load");
830
831 assert_eq!(snapshot.next().expect("first").unwrap().primary_key, "a");
832 assert_eq!(
833 snapshot.advance(1).expect("second").unwrap().primary_key,
834 "b"
835 );
836 assert_eq!(
837 snapshot.advance(1).expect("third").unwrap().primary_key,
838 "c"
839 );
840 }
841
842 #[test]
843 fn cursor_snapshot_index_range_accepts_scalar_entry_keys() {
844 let mut snapshot = new_indexeddb_cursor_snapshot(&IndexedDBOpenCursorRequest {
845 index: "by_status".to_string(),
846 ..Default::default()
847 });
848 snapshot
849 .load(
850 vec![
851 entry(json!("done"), "issue-2", json!("issue-2")),
852 entry(json!("active"), "issue-1", json!("issue-1")),
853 ],
854 Some(&KeyRange {
855 lower: Some(json!("active")),
856 upper: Some(json!("active")),
857 lower_open: false,
858 upper_open: false,
859 }),
860 )
861 .expect("load");
862
863 let first = snapshot.next().expect("first").unwrap();
864 assert_eq!(first.primary_key, "issue-1");
865 assert_eq!(first.key, json!("active"));
866 assert!(snapshot.next().expect("exhausted").is_none());
867 }
868
869 #[test]
870 fn range_bounds_normalize_scalar_index_bounds() {
871 let (lower, upper) = indexeddb_range_bounds(
872 Some(&KeyRange {
873 lower: Some(json!("active")),
874 upper: Some(json!(["done"])),
875 lower_open: false,
876 upper_open: false,
877 }),
878 true,
879 );
880
881 assert_eq!(lower, Some(json!(["active"])));
882 assert_eq!(upper, Some(json!(["done"])));
883 }
884
885 #[test]
886 fn compare_values_orders_composite_keys() {
887 assert_eq!(
888 compare_indexeddb_values(&json!(["active", 1]), &json!(["active", 2])),
889 Ordering::Less
890 );
891 assert_eq!(
892 compare_indexeddb_values(&json!(["active", 2]), &json!(["active", 2])),
893 Ordering::Equal
894 );
895 assert_eq!(
896 compare_indexeddb_values(&json!(["active", 3]), &json!(["active", 2])),
897 Ordering::Greater
898 );
899 }
900
901 #[test]
902 fn compare_values_orders_large_integer_keys_exactly() {
903 assert_eq!(
904 compare_indexeddb_values(
905 &json!(9_007_199_254_740_993u64),
906 &json!(9_007_199_254_740_992u64)
907 ),
908 Ordering::Greater
909 );
910 assert_eq!(
911 compare_indexeddb_values(&json!(i64::MAX), &json!(u64::MAX)),
912 Ordering::Less
913 );
914 }
915}
916
917pub struct Cursor {
919 tx: mpsc::Sender<pb::CursorClientMessage>,
920 stream: tonic::Streaming<pb::CursorResponse>,
921 keys_only: bool,
922 index_cursor: bool,
923 entry: Option<pb::CursorEntry>,
924 done: bool,
925}
926
927impl Cursor {
928 pub fn key(&self) -> Option<serde_json::Value> {
930 let entry = self.entry.as_ref()?;
931 match entry.key.len() {
932 0 => None,
933 1 if !self.index_cursor => Some(key_value_to_json(&entry.key[0])),
934 _ => Some(serde_json::Value::Array(
935 entry.key.iter().map(key_value_to_json).collect(),
936 )),
937 }
938 }
939
940 pub fn primary_key(&self) -> &str {
942 self.entry
943 .as_ref()
944 .map(|e| e.primary_key.as_str())
945 .unwrap_or("")
946 }
947
948 pub fn value(&self) -> Result<Record, IndexedDBError> {
950 if self.keys_only {
951 return Err(IndexedDBError::KeysOnly);
952 }
953 let entry = self.entry.as_ref().ok_or(IndexedDBError::NotFound)?;
954 Ok(entry
955 .record
956 .as_ref()
957 .map(pb_record_to_record)
958 .unwrap_or_default())
959 }
960
961 pub async fn continue_next(&mut self) -> Result<bool, IndexedDBError> {
963 let cmd = pb::cursor_command::Command::Next(true);
964 self.send_and_recv(cmd).await
965 }
966
967 pub async fn continue_to_key(
969 &mut self,
970 key: serde_json::Value,
971 ) -> Result<bool, IndexedDBError> {
972 let cmd = pb::cursor_command::Command::ContinueToKey(pb::CursorKeyTarget {
973 key: cursor_key_to_proto(&key, self.index_cursor),
974 });
975 self.send_and_recv(cmd).await
976 }
977
978 pub async fn advance(&mut self, count: i32) -> Result<bool, IndexedDBError> {
980 let cmd = pb::cursor_command::Command::Advance(count);
981 self.send_and_recv(cmd).await
982 }
983
984 pub async fn delete(&mut self) -> Result<(), IndexedDBError> {
986 if self.done {
987 return Err(IndexedDBError::NotFound);
988 }
989 let cmd = pb::cursor_command::Command::Delete(true);
990 self.send_mutation(cmd).await
991 }
992
993 pub async fn update(&mut self, value: Record) -> Result<(), IndexedDBError> {
995 if self.done {
996 return Err(IndexedDBError::NotFound);
997 }
998 let cmd = pb::cursor_command::Command::Update(record_to_pb_record(value));
999 self.send_mutation(cmd).await
1000 }
1001
1002 pub async fn close(self) -> Result<(), IndexedDBError> {
1004 let msg = pb::CursorClientMessage {
1005 msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
1006 command: Some(pb::cursor_command::Command::Close(true)),
1007 })),
1008 };
1009 self.tx
1010 .send(msg)
1011 .await
1012 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1013 Ok(())
1014 }
1015
1016 async fn send_mutation(
1017 &mut self,
1018 cmd: pb::cursor_command::Command,
1019 ) -> Result<(), IndexedDBError> {
1020 let msg = pb::CursorClientMessage {
1021 msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
1022 command: Some(cmd),
1023 })),
1024 };
1025 self.tx
1026 .send(msg)
1027 .await
1028 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1029 let resp = self
1031 .stream
1032 .message()
1033 .await
1034 .map_err(map_status)?
1035 .ok_or_else(|| {
1036 IndexedDBError::Status(tonic::Status::internal(
1037 "cursor stream ended during mutation",
1038 ))
1039 })?;
1040 match resp.result {
1041 Some(pb::cursor_response::Result::Entry(entry)) => {
1042 self.entry = Some(entry);
1043 }
1044 Some(pb::cursor_response::Result::Done(_)) => {}
1045 None => {
1046 return Err(IndexedDBError::Status(tonic::Status::internal(
1047 "unexpected cursor mutation ack",
1048 )));
1049 }
1050 }
1051 Ok(())
1052 }
1053
1054 async fn send_and_recv(
1055 &mut self,
1056 cmd: pb::cursor_command::Command,
1057 ) -> Result<bool, IndexedDBError> {
1058 if self.done {
1059 return Ok(false);
1060 }
1061 let msg = pb::CursorClientMessage {
1062 msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
1063 command: Some(cmd),
1064 })),
1065 };
1066 self.tx
1067 .send(msg)
1068 .await
1069 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1070
1071 let resp = self
1072 .stream
1073 .message()
1074 .await
1075 .map_err(map_status)?
1076 .ok_or_else(|| {
1077 IndexedDBError::Status(tonic::Status::internal("cursor stream ended"))
1078 })?;
1079
1080 match resp.result {
1081 Some(pb::cursor_response::Result::Entry(entry)) => {
1082 self.entry = Some(entry);
1083 self.done = false;
1084 Ok(true)
1085 }
1086 Some(pb::cursor_response::Result::Done(exhausted)) => {
1087 if exhausted {
1088 self.done = true;
1089 }
1090 self.entry = None;
1091 Ok(false)
1092 }
1093 None => {
1094 self.entry = None;
1095 self.done = true;
1096 Ok(false)
1097 }
1098 }
1099 }
1100}
1101
1102async fn open_cursor_inner(
1103 client: &mut IndexedDbClient<IndexedDbTransport>,
1104 req: pb::OpenCursorRequest,
1105) -> Result<Cursor, IndexedDBError> {
1106 let keys_only = req.keys_only;
1107 let is_index = !req.index.is_empty();
1108 let (tx, rx) = mpsc::channel::<pb::CursorClientMessage>(CURSOR_CHANNEL_BUFFER);
1109
1110 let open_msg = pb::CursorClientMessage {
1111 msg: Some(pb::cursor_client_message::Msg::Open(req)),
1112 };
1113 tx.send(open_msg)
1114 .await
1115 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1116
1117 let receiver_stream = ReceiverStream::new(rx);
1118 let mut stream = client
1119 .open_cursor(receiver_stream)
1120 .await
1121 .map_err(map_status)?
1122 .into_inner();
1123
1124 let ack = stream.message().await.map_err(map_status)?.ok_or_else(|| {
1126 IndexedDBError::Status(tonic::Status::internal("cursor stream ended during open"))
1127 })?;
1128 match ack.result {
1129 Some(pb::cursor_response::Result::Done(false)) => {}
1130 Some(pb::cursor_response::Result::Done(true)) => {
1131 return Err(IndexedDBError::Status(tonic::Status::internal(
1132 "unexpected exhausted cursor open ack",
1133 )));
1134 }
1135 _ => {
1136 return Err(IndexedDBError::Status(tonic::Status::internal(
1137 "unexpected cursor open ack",
1138 )));
1139 }
1140 }
1141
1142 Ok(Cursor {
1143 tx,
1144 stream,
1145 keys_only,
1146 entry: None,
1147 done: false,
1148 index_cursor: is_index,
1149 })
1150}
1151
1152pub struct IndexedDB {
1154 client: IndexedDbClient<IndexedDbTransport>,
1155}
1156
1157impl IndexedDB {
1158 pub async fn connect() -> Result<Self, IndexedDBError> {
1160 Self::connect_named("").await
1161 }
1162
1163 pub async fn connect_named(name: &str) -> Result<Self, IndexedDBError> {
1165 let target = std::env::var(ENV_HOST_SERVICE_SOCKET)
1166 .map_err(|_| IndexedDBError::Env(format!("{ENV_HOST_SERVICE_SOCKET} is not set")))?;
1167 let token = std::env::var(ENV_HOST_SERVICE_TOKEN).unwrap_or_default();
1168 let channel = match parse_indexeddb_target(&target)? {
1169 IndexedDBTarget::Unix(path) => {
1170 Endpoint::try_from("http://[::]:50051")?
1171 .connect_with_connector(service_fn(move |_: Uri| {
1172 let path = path.clone();
1173 async move {
1174 tokio::net::UnixStream::connect(path)
1175 .await
1176 .map(TokioIo::new)
1177 }
1178 }))
1179 .await?
1180 }
1181 IndexedDBTarget::Tcp(address) => {
1182 Endpoint::from_shared(format!("http://{address}"))?
1183 .connect()
1184 .await?
1185 }
1186 IndexedDBTarget::Tls(address) => {
1187 Endpoint::from_shared(format!("https://{address}"))?
1188 .tls_config(ClientTlsConfig::new().with_native_roots())?
1189 .connect()
1190 .await?
1191 }
1192 };
1193
1194 let client = IndexedDbClient::with_interceptor(
1195 channel,
1196 relay_token_interceptor(token.trim(), name)?,
1197 );
1198
1199 Ok(Self { client })
1200 }
1201
1202 pub async fn create_object_store(
1204 &mut self,
1205 name: &str,
1206 schema: ObjectStoreSchema,
1207 ) -> Result<ObjectStore, IndexedDBError> {
1208 let indexes = schema
1209 .indexes
1210 .into_iter()
1211 .map(|idx| pb::IndexSchema {
1212 name: idx.name,
1213 key_path: idx.key_path,
1214 unique: idx.unique,
1215 })
1216 .collect();
1217 self.client
1218 .create_object_store(pb::CreateObjectStoreRequest {
1219 name: name.to_string(),
1220 schema: Some(pb::ObjectStoreSchema {
1221 indexes,
1222 columns: vec![],
1223 }),
1224 })
1225 .await
1226 .map_err(map_status)?;
1227 Ok(self.object_store(name))
1228 }
1229
1230 pub async fn delete_object_store(&mut self, name: &str) -> Result<(), IndexedDBError> {
1232 self.client
1233 .delete_object_store(pb::DeleteObjectStoreRequest {
1234 name: name.to_string(),
1235 })
1236 .await
1237 .map_err(map_status)?;
1238 Ok(())
1239 }
1240
1241 pub fn object_store(&self, name: &str) -> ObjectStore {
1243 ObjectStore {
1244 client: self.client.clone(),
1245 store: name.to_string(),
1246 }
1247 }
1248
1249 pub async fn transaction(
1251 &self,
1252 stores: &[&str],
1253 mode: TransactionMode,
1254 options: TransactionOptions,
1255 ) -> Result<Transaction, IndexedDBError> {
1256 let (tx, rx) = mpsc::channel::<pb::TransactionClientMessage>(TRANSACTION_CHANNEL_BUFFER);
1257 tx.send(pb::TransactionClientMessage {
1258 msg: Some(pb::transaction_client_message::Msg::Begin(
1259 pb::BeginTransactionRequest {
1260 stores: stores.iter().map(|store| store.to_string()).collect(),
1261 mode: mode.to_proto(),
1262 durability_hint: options.durability_hint.to_proto(),
1263 },
1264 )),
1265 })
1266 .await
1267 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1268
1269 let receiver_stream = ReceiverStream::new(rx);
1270 let mut client = self.client.clone();
1271 let mut stream = client
1272 .transaction(receiver_stream)
1273 .await
1274 .map_err(map_status)?
1275 .into_inner();
1276
1277 let ack = stream.message().await.map_err(map_status)?.ok_or_else(|| {
1278 IndexedDBError::Transaction("transaction stream ended during begin".to_string())
1279 })?;
1280 match ack.msg {
1281 Some(pb::transaction_server_message::Msg::Begin(_)) => {}
1282 _ => {
1283 return Err(IndexedDBError::Transaction(
1284 "expected transaction begin response".to_string(),
1285 ));
1286 }
1287 }
1288
1289 Ok(Transaction {
1290 tx: Some(tx),
1291 stream,
1292 request_id: 0,
1293 closed: false,
1294 })
1295 }
1296}
1297
1298pub struct Transaction {
1300 tx: Option<mpsc::Sender<pb::TransactionClientMessage>>,
1301 stream: tonic::Streaming<pb::TransactionServerMessage>,
1302 request_id: u64,
1303 closed: bool,
1304}
1305
1306impl Transaction {
1307 pub fn object_store<'a>(&'a mut self, name: &str) -> TransactionObjectStore<'a> {
1309 TransactionObjectStore {
1310 tx: self,
1311 store: name.to_string(),
1312 }
1313 }
1314
1315 pub async fn commit(&mut self) -> Result<(), IndexedDBError> {
1317 self.ensure_open()?;
1318 let tx = self.tx.as_ref().ok_or_else(|| {
1319 IndexedDBError::Transaction("transaction is already finished".to_string())
1320 })?;
1321 tx.send(pb::TransactionClientMessage {
1322 msg: Some(pb::transaction_client_message::Msg::Commit(
1323 pb::TransactionCommitRequest {},
1324 )),
1325 })
1326 .await
1327 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1328 self.closed = true;
1329 self.tx.take();
1330
1331 let resp = self
1332 .stream
1333 .message()
1334 .await
1335 .map_err(map_status)?
1336 .ok_or_else(|| {
1337 IndexedDBError::Transaction("transaction stream ended during commit".to_string())
1338 })?;
1339 match resp.msg {
1340 Some(pb::transaction_server_message::Msg::Commit(commit)) => {
1341 map_rpc_status(commit.error)
1342 }
1343 _ => Err(IndexedDBError::Transaction(
1344 "expected transaction commit response".to_string(),
1345 )),
1346 }
1347 }
1348
1349 pub async fn abort(&mut self, reason: &str) -> Result<(), IndexedDBError> {
1351 if self.closed {
1352 return Ok(());
1353 }
1354 let tx = self.tx.as_ref().ok_or_else(|| {
1355 IndexedDBError::Transaction("transaction is already finished".to_string())
1356 })?;
1357 tx.send(pb::TransactionClientMessage {
1358 msg: Some(pb::transaction_client_message::Msg::Abort(
1359 pb::TransactionAbortRequest {
1360 reason: reason.to_string(),
1361 },
1362 )),
1363 })
1364 .await
1365 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1366 self.closed = true;
1367 self.tx.take();
1368
1369 let resp = self
1370 .stream
1371 .message()
1372 .await
1373 .map_err(map_status)?
1374 .ok_or_else(|| {
1375 IndexedDBError::Transaction("transaction stream ended during abort".to_string())
1376 })?;
1377 match resp.msg {
1378 Some(pb::transaction_server_message::Msg::Abort(abort)) => map_rpc_status(abort.error),
1379 _ => Err(IndexedDBError::Transaction(
1380 "expected transaction abort response".to_string(),
1381 )),
1382 }
1383 }
1384
1385 async fn send_operation(
1386 &mut self,
1387 operation: pb::transaction_operation::Operation,
1388 ) -> Result<pb::TransactionOperationResponse, IndexedDBError> {
1389 self.ensure_open()?;
1390 self.request_id += 1;
1391 let request_id = self.request_id;
1392 let tx = self.tx.as_ref().ok_or_else(|| {
1393 IndexedDBError::Transaction("transaction is already finished".to_string())
1394 })?;
1395 tx.send(pb::TransactionClientMessage {
1396 msg: Some(pb::transaction_client_message::Msg::Operation(
1397 pb::TransactionOperation {
1398 request_id,
1399 operation: Some(operation),
1400 },
1401 )),
1402 })
1403 .await
1404 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
1405
1406 let resp = self
1407 .stream
1408 .message()
1409 .await
1410 .map_err(map_status)?
1411 .ok_or_else(|| {
1412 IndexedDBError::Transaction("transaction stream ended during operation".to_string())
1413 })?;
1414 let op = match resp.msg {
1415 Some(pb::transaction_server_message::Msg::Operation(op)) => op,
1416 _ => {
1417 self.close_locally();
1418 return Err(IndexedDBError::Transaction(
1419 "expected transaction operation response".to_string(),
1420 ));
1421 }
1422 };
1423 if op.request_id != request_id {
1424 self.close_locally();
1425 return Err(IndexedDBError::Transaction(
1426 "transaction response request id mismatch".to_string(),
1427 ));
1428 }
1429 if let Err(err) = map_rpc_status(op.error.clone()) {
1430 self.close_locally();
1431 return Err(err);
1432 }
1433 Ok(op)
1434 }
1435
1436 fn ensure_open(&self) -> Result<(), IndexedDBError> {
1437 if self.closed {
1438 return Err(IndexedDBError::Transaction(
1439 "transaction is already finished".to_string(),
1440 ));
1441 }
1442 Ok(())
1443 }
1444
1445 fn close_locally(&mut self) {
1446 self.closed = true;
1447 self.tx.take();
1448 }
1449}
1450
1451pub struct TransactionObjectStore<'a> {
1453 tx: &'a mut Transaction,
1454 store: String,
1455}
1456
1457impl TransactionObjectStore<'_> {
1458 pub async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
1460 let resp = self
1461 .tx
1462 .send_operation(pb::transaction_operation::Operation::Get(
1463 pb::ObjectStoreRequest {
1464 store: self.store.clone(),
1465 id: id.to_string(),
1466 },
1467 ))
1468 .await?;
1469 match resp.result {
1470 Some(pb::transaction_operation_response::Result::Record(record)) => Ok(record
1471 .record
1472 .as_ref()
1473 .map(pb_record_to_record)
1474 .unwrap_or_default()),
1475 _ => Err(unexpected_transaction_result()),
1476 }
1477 }
1478
1479 pub async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
1481 let resp = self
1482 .tx
1483 .send_operation(pb::transaction_operation::Operation::GetKey(
1484 pb::ObjectStoreRequest {
1485 store: self.store.clone(),
1486 id: id.to_string(),
1487 },
1488 ))
1489 .await?;
1490 match resp.result {
1491 Some(pb::transaction_operation_response::Result::Key(key)) => Ok(key.key),
1492 _ => Err(unexpected_transaction_result()),
1493 }
1494 }
1495
1496 pub async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
1498 self.tx
1499 .send_operation(pb::transaction_operation::Operation::Add(
1500 pb::RecordRequest {
1501 store: self.store.clone(),
1502 record: Some(record_to_pb_record(record)),
1503 },
1504 ))
1505 .await?;
1506 Ok(())
1507 }
1508
1509 pub async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
1511 self.tx
1512 .send_operation(pb::transaction_operation::Operation::Put(
1513 pb::RecordRequest {
1514 store: self.store.clone(),
1515 record: Some(record_to_pb_record(record)),
1516 },
1517 ))
1518 .await?;
1519 Ok(())
1520 }
1521
1522 pub async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
1524 self.tx
1525 .send_operation(pb::transaction_operation::Operation::Delete(
1526 pb::ObjectStoreRequest {
1527 store: self.store.clone(),
1528 id: id.to_string(),
1529 },
1530 ))
1531 .await?;
1532 Ok(())
1533 }
1534
1535 pub async fn clear(&mut self) -> Result<(), IndexedDBError> {
1537 self.tx
1538 .send_operation(pb::transaction_operation::Operation::Clear(
1539 pb::ObjectStoreNameRequest {
1540 store: self.store.clone(),
1541 },
1542 ))
1543 .await?;
1544 Ok(())
1545 }
1546
1547 pub async fn get_all(
1549 &mut self,
1550 range: Option<KeyRange>,
1551 ) -> Result<Vec<Record>, IndexedDBError> {
1552 let resp = self
1553 .tx
1554 .send_operation(pb::transaction_operation::Operation::GetAll(
1555 pb::ObjectStoreRangeRequest {
1556 store: self.store.clone(),
1557 range: range.map(key_range_to_pb),
1558 },
1559 ))
1560 .await?;
1561 match resp.result {
1562 Some(pb::transaction_operation_response::Result::Records(records)) => {
1563 Ok(records.records.iter().map(pb_record_to_record).collect())
1564 }
1565 _ => Err(unexpected_transaction_result()),
1566 }
1567 }
1568
1569 pub async fn get_all_keys(
1571 &mut self,
1572 range: Option<KeyRange>,
1573 ) -> Result<Vec<String>, IndexedDBError> {
1574 let resp = self
1575 .tx
1576 .send_operation(pb::transaction_operation::Operation::GetAllKeys(
1577 pb::ObjectStoreRangeRequest {
1578 store: self.store.clone(),
1579 range: range.map(key_range_to_pb),
1580 },
1581 ))
1582 .await?;
1583 match resp.result {
1584 Some(pb::transaction_operation_response::Result::Keys(keys)) => Ok(keys.keys),
1585 _ => Err(unexpected_transaction_result()),
1586 }
1587 }
1588
1589 pub async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
1591 let resp = self
1592 .tx
1593 .send_operation(pb::transaction_operation::Operation::Count(
1594 pb::ObjectStoreRangeRequest {
1595 store: self.store.clone(),
1596 range: range.map(key_range_to_pb),
1597 },
1598 ))
1599 .await?;
1600 match resp.result {
1601 Some(pb::transaction_operation_response::Result::Count(count)) => Ok(count.count),
1602 _ => Err(unexpected_transaction_result()),
1603 }
1604 }
1605
1606 pub async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
1608 let resp = self
1609 .tx
1610 .send_operation(pb::transaction_operation::Operation::DeleteRange(
1611 pb::ObjectStoreRangeRequest {
1612 store: self.store.clone(),
1613 range: Some(key_range_to_pb(range)),
1614 },
1615 ))
1616 .await?;
1617 match resp.result {
1618 Some(pb::transaction_operation_response::Result::Delete(deleted)) => {
1619 Ok(deleted.deleted)
1620 }
1621 _ => Err(unexpected_transaction_result()),
1622 }
1623 }
1624
1625 pub fn index<'a>(&'a mut self, name: &str) -> TransactionIndex<'a> {
1627 TransactionIndex {
1628 tx: &mut *self.tx,
1629 store: self.store.clone(),
1630 index: name.to_string(),
1631 }
1632 }
1633}
1634
1635pub struct TransactionIndex<'a> {
1637 tx: &'a mut Transaction,
1638 store: String,
1639 index: String,
1640}
1641
1642impl TransactionIndex<'_> {
1643 pub async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
1645 let resp = self
1646 .tx
1647 .send_operation(pb::transaction_operation::Operation::IndexGet(
1648 self.index_request(values, None),
1649 ))
1650 .await?;
1651 match resp.result {
1652 Some(pb::transaction_operation_response::Result::Record(record)) => Ok(record
1653 .record
1654 .as_ref()
1655 .map(pb_record_to_record)
1656 .unwrap_or_default()),
1657 _ => Err(unexpected_transaction_result()),
1658 }
1659 }
1660
1661 pub async fn get_key(
1663 &mut self,
1664 values: &[serde_json::Value],
1665 ) -> Result<String, IndexedDBError> {
1666 let resp = self
1667 .tx
1668 .send_operation(pb::transaction_operation::Operation::IndexGetKey(
1669 self.index_request(values, None),
1670 ))
1671 .await?;
1672 match resp.result {
1673 Some(pb::transaction_operation_response::Result::Key(key)) => Ok(key.key),
1674 _ => Err(unexpected_transaction_result()),
1675 }
1676 }
1677
1678 pub async fn get_all(
1680 &mut self,
1681 values: &[serde_json::Value],
1682 range: Option<KeyRange>,
1683 ) -> Result<Vec<Record>, IndexedDBError> {
1684 let resp = self
1685 .tx
1686 .send_operation(pb::transaction_operation::Operation::IndexGetAll(
1687 self.index_request(values, range),
1688 ))
1689 .await?;
1690 match resp.result {
1691 Some(pb::transaction_operation_response::Result::Records(records)) => {
1692 Ok(records.records.iter().map(pb_record_to_record).collect())
1693 }
1694 _ => Err(unexpected_transaction_result()),
1695 }
1696 }
1697
1698 pub async fn get_all_keys(
1700 &mut self,
1701 values: &[serde_json::Value],
1702 range: Option<KeyRange>,
1703 ) -> Result<Vec<String>, IndexedDBError> {
1704 let resp = self
1705 .tx
1706 .send_operation(pb::transaction_operation::Operation::IndexGetAllKeys(
1707 self.index_request(values, range),
1708 ))
1709 .await?;
1710 match resp.result {
1711 Some(pb::transaction_operation_response::Result::Keys(keys)) => Ok(keys.keys),
1712 _ => Err(unexpected_transaction_result()),
1713 }
1714 }
1715
1716 pub async fn count(
1718 &mut self,
1719 values: &[serde_json::Value],
1720 range: Option<KeyRange>,
1721 ) -> Result<i64, IndexedDBError> {
1722 let resp = self
1723 .tx
1724 .send_operation(pb::transaction_operation::Operation::IndexCount(
1725 self.index_request(values, range),
1726 ))
1727 .await?;
1728 match resp.result {
1729 Some(pb::transaction_operation_response::Result::Count(count)) => Ok(count.count),
1730 _ => Err(unexpected_transaction_result()),
1731 }
1732 }
1733
1734 pub async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
1736 let resp = self
1737 .tx
1738 .send_operation(pb::transaction_operation::Operation::IndexDelete(
1739 self.index_request(values, None),
1740 ))
1741 .await?;
1742 match resp.result {
1743 Some(pb::transaction_operation_response::Result::Delete(deleted)) => {
1744 Ok(deleted.deleted)
1745 }
1746 _ => Err(unexpected_transaction_result()),
1747 }
1748 }
1749
1750 pub async fn delete_range(
1752 &mut self,
1753 values: &[serde_json::Value],
1754 range: KeyRange,
1755 ) -> Result<i64, IndexedDBError> {
1756 let resp = self
1757 .tx
1758 .send_operation(pb::transaction_operation::Operation::IndexDelete(
1759 self.index_request(values, Some(range)),
1760 ))
1761 .await?;
1762 match resp.result {
1763 Some(pb::transaction_operation_response::Result::Delete(deleted)) => {
1764 Ok(deleted.deleted)
1765 }
1766 _ => Err(unexpected_transaction_result()),
1767 }
1768 }
1769
1770 fn index_request(
1771 &self,
1772 values: &[serde_json::Value],
1773 range: Option<KeyRange>,
1774 ) -> pb::IndexQueryRequest {
1775 pb::IndexQueryRequest {
1776 store: self.store.clone(),
1777 index: self.index.clone(),
1778 values: values.iter().map(json_to_typed_value).collect(),
1779 range: range.map(key_range_to_pb),
1780 }
1781 }
1782}
1783
1784enum IndexedDBTarget {
1785 Unix(String),
1786 Tcp(String),
1787 Tls(String),
1788}
1789
1790fn parse_indexeddb_target(raw_target: &str) -> Result<IndexedDBTarget, IndexedDBError> {
1791 let target = raw_target.trim();
1792 if target.is_empty() {
1793 return Err(IndexedDBError::Env(
1794 "IndexedDB transport target is required".to_string(),
1795 ));
1796 }
1797 if let Some(address) = target.strip_prefix("tcp://") {
1798 let address = address.trim();
1799 if address.is_empty() {
1800 return Err(IndexedDBError::Env(format!(
1801 "IndexedDB tcp target {raw_target:?} is missing host:port"
1802 )));
1803 }
1804 return Ok(IndexedDBTarget::Tcp(address.to_string()));
1805 }
1806 if let Some(address) = target.strip_prefix("tls://") {
1807 let address = address.trim();
1808 if address.is_empty() {
1809 return Err(IndexedDBError::Env(format!(
1810 "IndexedDB tls target {raw_target:?} is missing host:port"
1811 )));
1812 }
1813 return Ok(IndexedDBTarget::Tls(address.to_string()));
1814 }
1815 if let Some(path) = target.strip_prefix("unix://") {
1816 let path = path.trim();
1817 if path.is_empty() {
1818 return Err(IndexedDBError::Env(format!(
1819 "IndexedDB unix target {raw_target:?} is missing a socket path"
1820 )));
1821 }
1822 return Ok(IndexedDBTarget::Unix(path.to_string()));
1823 }
1824 if target.contains("://") {
1825 let scheme = target.split("://").next().unwrap_or_default();
1826 return Err(IndexedDBError::Env(format!(
1827 "unsupported IndexedDB target scheme {scheme:?}"
1828 )));
1829 }
1830 Ok(IndexedDBTarget::Unix(target.to_string()))
1831}
1832
1833pub struct ObjectStore {
1835 client: IndexedDbClient<IndexedDbTransport>,
1836 store: String,
1837}
1838
1839impl ObjectStore {
1840 pub async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
1842 let resp = self
1843 .client
1844 .get(pb::ObjectStoreRequest {
1845 store: self.store.clone(),
1846 id: id.to_string(),
1847 })
1848 .await
1849 .map_err(map_status)?;
1850 Ok(resp
1851 .into_inner()
1852 .record
1853 .as_ref()
1854 .map(pb_record_to_record)
1855 .unwrap_or_default())
1856 }
1857
1858 pub async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
1860 let resp = self
1861 .client
1862 .get_key(pb::ObjectStoreRequest {
1863 store: self.store.clone(),
1864 id: id.to_string(),
1865 })
1866 .await
1867 .map_err(map_status)?;
1868 Ok(resp.into_inner().key)
1869 }
1870
1871 pub async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
1873 self.client
1874 .add(pb::RecordRequest {
1875 store: self.store.clone(),
1876 record: Some(record_to_pb_record(record)),
1877 })
1878 .await
1879 .map_err(map_status)?;
1880 Ok(())
1881 }
1882
1883 pub async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
1885 self.client
1886 .put(pb::RecordRequest {
1887 store: self.store.clone(),
1888 record: Some(record_to_pb_record(record)),
1889 })
1890 .await
1891 .map_err(map_status)?;
1892 Ok(())
1893 }
1894
1895 pub async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
1897 self.client
1898 .delete(pb::ObjectStoreRequest {
1899 store: self.store.clone(),
1900 id: id.to_string(),
1901 })
1902 .await
1903 .map_err(map_status)?;
1904 Ok(())
1905 }
1906
1907 pub async fn clear(&mut self) -> Result<(), IndexedDBError> {
1909 self.client
1910 .clear(pb::ObjectStoreNameRequest {
1911 store: self.store.clone(),
1912 })
1913 .await
1914 .map_err(map_status)?;
1915 Ok(())
1916 }
1917
1918 pub async fn get_all(
1920 &mut self,
1921 range: Option<KeyRange>,
1922 ) -> Result<Vec<Record>, IndexedDBError> {
1923 let resp = self
1924 .client
1925 .get_all(pb::ObjectStoreRangeRequest {
1926 store: self.store.clone(),
1927 range: range.map(key_range_to_pb),
1928 })
1929 .await
1930 .map_err(map_status)?;
1931 Ok(resp
1932 .into_inner()
1933 .records
1934 .iter()
1935 .map(pb_record_to_record)
1936 .collect())
1937 }
1938
1939 pub async fn get_all_keys(
1941 &mut self,
1942 range: Option<KeyRange>,
1943 ) -> Result<Vec<String>, IndexedDBError> {
1944 let resp = self
1945 .client
1946 .get_all_keys(pb::ObjectStoreRangeRequest {
1947 store: self.store.clone(),
1948 range: range.map(key_range_to_pb),
1949 })
1950 .await
1951 .map_err(map_status)?;
1952 Ok(resp.into_inner().keys)
1953 }
1954
1955 pub async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
1957 let resp = self
1958 .client
1959 .count(pb::ObjectStoreRangeRequest {
1960 store: self.store.clone(),
1961 range: range.map(key_range_to_pb),
1962 })
1963 .await
1964 .map_err(map_status)?;
1965 Ok(resp.into_inner().count)
1966 }
1967
1968 pub async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
1970 let resp = self
1971 .client
1972 .delete_range(pb::ObjectStoreRangeRequest {
1973 store: self.store.clone(),
1974 range: Some(key_range_to_pb(range)),
1975 })
1976 .await
1977 .map_err(map_status)?;
1978 Ok(resp.into_inner().deleted)
1979 }
1980
1981 pub fn index(&self, name: &str) -> Index {
1983 Index {
1984 client: self.client.clone(),
1985 store: self.store.clone(),
1986 index: name.to_string(),
1987 }
1988 }
1989
1990 pub async fn open_cursor(
1992 &mut self,
1993 range: Option<KeyRange>,
1994 direction: CursorDirection,
1995 ) -> Result<Cursor, IndexedDBError> {
1996 let req = pb::OpenCursorRequest {
1997 store: self.store.clone(),
1998 range: range.map(key_range_to_pb),
1999 direction: direction.to_proto(),
2000 keys_only: false,
2001 index: String::new(),
2002 values: vec![],
2003 };
2004 open_cursor_inner(&mut self.client, req).await
2005 }
2006
2007 pub async fn open_key_cursor(
2009 &mut self,
2010 range: Option<KeyRange>,
2011 direction: CursorDirection,
2012 ) -> Result<Cursor, IndexedDBError> {
2013 let req = pb::OpenCursorRequest {
2014 store: self.store.clone(),
2015 range: range.map(key_range_to_pb),
2016 direction: direction.to_proto(),
2017 keys_only: true,
2018 index: String::new(),
2019 values: vec![],
2020 };
2021 open_cursor_inner(&mut self.client, req).await
2022 }
2023}
2024
2025pub struct Index {
2027 client: IndexedDbClient<IndexedDbTransport>,
2028 store: String,
2029 index: String,
2030}
2031
2032impl Index {
2033 pub async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
2035 let resp = self
2036 .client
2037 .index_get(pb::IndexQueryRequest {
2038 store: self.store.clone(),
2039 index: self.index.clone(),
2040 values: values.iter().map(json_to_typed_value).collect(),
2041 range: None,
2042 })
2043 .await
2044 .map_err(map_status)?;
2045 Ok(resp
2046 .into_inner()
2047 .record
2048 .as_ref()
2049 .map(pb_record_to_record)
2050 .unwrap_or_default())
2051 }
2052
2053 pub async fn get_key(
2055 &mut self,
2056 values: &[serde_json::Value],
2057 ) -> Result<String, IndexedDBError> {
2058 let resp = self
2059 .client
2060 .index_get_key(pb::IndexQueryRequest {
2061 store: self.store.clone(),
2062 index: self.index.clone(),
2063 values: values.iter().map(json_to_typed_value).collect(),
2064 range: None,
2065 })
2066 .await
2067 .map_err(map_status)?;
2068 Ok(resp.into_inner().key)
2069 }
2070
2071 pub async fn get_all(
2073 &mut self,
2074 values: &[serde_json::Value],
2075 range: Option<KeyRange>,
2076 ) -> Result<Vec<Record>, IndexedDBError> {
2077 let resp = self
2078 .client
2079 .index_get_all(pb::IndexQueryRequest {
2080 store: self.store.clone(),
2081 index: self.index.clone(),
2082 values: values.iter().map(json_to_typed_value).collect(),
2083 range: range.map(key_range_to_pb),
2084 })
2085 .await
2086 .map_err(map_status)?;
2087 Ok(resp
2088 .into_inner()
2089 .records
2090 .iter()
2091 .map(pb_record_to_record)
2092 .collect())
2093 }
2094
2095 pub async fn get_all_keys(
2097 &mut self,
2098 values: &[serde_json::Value],
2099 range: Option<KeyRange>,
2100 ) -> Result<Vec<String>, IndexedDBError> {
2101 let resp = self
2102 .client
2103 .index_get_all_keys(pb::IndexQueryRequest {
2104 store: self.store.clone(),
2105 index: self.index.clone(),
2106 values: values.iter().map(json_to_typed_value).collect(),
2107 range: range.map(key_range_to_pb),
2108 })
2109 .await
2110 .map_err(map_status)?;
2111 Ok(resp.into_inner().keys)
2112 }
2113
2114 pub async fn count(
2116 &mut self,
2117 values: &[serde_json::Value],
2118 range: Option<KeyRange>,
2119 ) -> Result<i64, IndexedDBError> {
2120 let resp = self
2121 .client
2122 .index_count(pb::IndexQueryRequest {
2123 store: self.store.clone(),
2124 index: self.index.clone(),
2125 values: values.iter().map(json_to_typed_value).collect(),
2126 range: range.map(key_range_to_pb),
2127 })
2128 .await
2129 .map_err(map_status)?;
2130 Ok(resp.into_inner().count)
2131 }
2132
2133 pub async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
2135 let resp = self
2136 .client
2137 .index_delete(pb::IndexQueryRequest {
2138 store: self.store.clone(),
2139 index: self.index.clone(),
2140 values: values.iter().map(json_to_typed_value).collect(),
2141 range: None,
2142 })
2143 .await
2144 .map_err(map_status)?;
2145 Ok(resp.into_inner().deleted)
2146 }
2147
2148 pub async fn delete_range(
2150 &mut self,
2151 values: &[serde_json::Value],
2152 range: KeyRange,
2153 ) -> Result<i64, IndexedDBError> {
2154 let resp = self
2155 .client
2156 .index_delete(pb::IndexQueryRequest {
2157 store: self.store.clone(),
2158 index: self.index.clone(),
2159 values: values.iter().map(json_to_typed_value).collect(),
2160 range: Some(key_range_to_pb(range)),
2161 })
2162 .await
2163 .map_err(map_status)?;
2164 Ok(resp.into_inner().deleted)
2165 }
2166
2167 pub async fn open_cursor(
2169 &mut self,
2170 values: &[serde_json::Value],
2171 range: Option<KeyRange>,
2172 direction: CursorDirection,
2173 ) -> Result<Cursor, IndexedDBError> {
2174 let req = pb::OpenCursorRequest {
2175 store: self.store.clone(),
2176 range: range.map(key_range_to_pb),
2177 direction: direction.to_proto(),
2178 keys_only: false,
2179 index: self.index.clone(),
2180 values: values.iter().map(json_to_typed_value).collect(),
2181 };
2182 open_cursor_inner(&mut self.client, req).await
2183 }
2184
2185 pub async fn open_key_cursor(
2187 &mut self,
2188 values: &[serde_json::Value],
2189 range: Option<KeyRange>,
2190 direction: CursorDirection,
2191 ) -> Result<Cursor, IndexedDBError> {
2192 let req = pb::OpenCursorRequest {
2193 store: self.store.clone(),
2194 range: range.map(key_range_to_pb),
2195 direction: direction.to_proto(),
2196 keys_only: true,
2197 index: self.index.clone(),
2198 values: values.iter().map(json_to_typed_value).collect(),
2199 };
2200 open_cursor_inner(&mut self.client, req).await
2201 }
2202}
2203
2204#[async_trait]
2205impl IndexedDBApi for IndexedDB {
2206 type ObjectStore = ObjectStore;
2207 type Transaction = Transaction;
2208
2209 async fn create_object_store(
2210 &mut self,
2211 name: &str,
2212 schema: ObjectStoreSchema,
2213 ) -> Result<ObjectStore, IndexedDBError> {
2214 IndexedDB::create_object_store(self, name, schema).await
2215 }
2216
2217 async fn delete_object_store(&mut self, name: &str) -> Result<(), IndexedDBError> {
2218 IndexedDB::delete_object_store(self, name).await
2219 }
2220
2221 fn object_store(&self, name: &str) -> ObjectStore {
2222 IndexedDB::object_store(self, name)
2223 }
2224
2225 async fn transaction(
2226 &self,
2227 stores: &[&str],
2228 mode: TransactionMode,
2229 options: TransactionOptions,
2230 ) -> Result<Transaction, IndexedDBError> {
2231 IndexedDB::transaction(self, stores, mode, options).await
2232 }
2233}
2234
2235#[async_trait]
2236impl ObjectStoreApi for ObjectStore {
2237 type Index = Index;
2238 type Cursor = Cursor;
2239
2240 async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
2241 ObjectStore::get(self, id).await
2242 }
2243
2244 async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
2245 ObjectStore::get_key(self, id).await
2246 }
2247
2248 async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
2249 ObjectStore::add(self, record).await
2250 }
2251
2252 async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
2253 ObjectStore::put(self, record).await
2254 }
2255
2256 async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
2257 ObjectStore::delete(self, id).await
2258 }
2259
2260 async fn clear(&mut self) -> Result<(), IndexedDBError> {
2261 ObjectStore::clear(self).await
2262 }
2263
2264 async fn get_all(&mut self, range: Option<KeyRange>) -> Result<Vec<Record>, IndexedDBError> {
2265 ObjectStore::get_all(self, range).await
2266 }
2267
2268 async fn get_all_keys(
2269 &mut self,
2270 range: Option<KeyRange>,
2271 ) -> Result<Vec<String>, IndexedDBError> {
2272 ObjectStore::get_all_keys(self, range).await
2273 }
2274
2275 async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
2276 ObjectStore::count(self, range).await
2277 }
2278
2279 async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
2280 ObjectStore::delete_range(self, range).await
2281 }
2282
2283 fn index(&self, name: &str) -> Index {
2284 ObjectStore::index(self, name)
2285 }
2286
2287 async fn open_cursor(
2288 &mut self,
2289 range: Option<KeyRange>,
2290 direction: CursorDirection,
2291 ) -> Result<Cursor, IndexedDBError> {
2292 ObjectStore::open_cursor(self, range, direction).await
2293 }
2294
2295 async fn open_key_cursor(
2296 &mut self,
2297 range: Option<KeyRange>,
2298 direction: CursorDirection,
2299 ) -> Result<Cursor, IndexedDBError> {
2300 ObjectStore::open_key_cursor(self, range, direction).await
2301 }
2302}
2303
2304#[async_trait]
2305impl IndexApi for Index {
2306 type Cursor = Cursor;
2307
2308 async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
2309 Index::get(self, values).await
2310 }
2311
2312 async fn get_key(&mut self, values: &[serde_json::Value]) -> Result<String, IndexedDBError> {
2313 Index::get_key(self, values).await
2314 }
2315
2316 async fn get_all(
2317 &mut self,
2318 values: &[serde_json::Value],
2319 range: Option<KeyRange>,
2320 ) -> Result<Vec<Record>, IndexedDBError> {
2321 Index::get_all(self, values, range).await
2322 }
2323
2324 async fn get_all_keys(
2325 &mut self,
2326 values: &[serde_json::Value],
2327 range: Option<KeyRange>,
2328 ) -> Result<Vec<String>, IndexedDBError> {
2329 Index::get_all_keys(self, values, range).await
2330 }
2331
2332 async fn count(
2333 &mut self,
2334 values: &[serde_json::Value],
2335 range: Option<KeyRange>,
2336 ) -> Result<i64, IndexedDBError> {
2337 Index::count(self, values, range).await
2338 }
2339
2340 async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
2341 Index::delete(self, values).await
2342 }
2343
2344 async fn delete_range(
2345 &mut self,
2346 values: &[serde_json::Value],
2347 range: KeyRange,
2348 ) -> Result<i64, IndexedDBError> {
2349 Index::delete_range(self, values, range).await
2350 }
2351
2352 async fn open_cursor(
2353 &mut self,
2354 values: &[serde_json::Value],
2355 range: Option<KeyRange>,
2356 direction: CursorDirection,
2357 ) -> Result<Cursor, IndexedDBError> {
2358 Index::open_cursor(self, values, range, direction).await
2359 }
2360
2361 async fn open_key_cursor(
2362 &mut self,
2363 values: &[serde_json::Value],
2364 range: Option<KeyRange>,
2365 direction: CursorDirection,
2366 ) -> Result<Cursor, IndexedDBError> {
2367 Index::open_key_cursor(self, values, range, direction).await
2368 }
2369}
2370
2371#[async_trait]
2372impl TransactionApi for Transaction {
2373 type ObjectStore<'a> = TransactionObjectStore<'a>;
2374
2375 fn object_store<'a>(&'a mut self, name: &str) -> TransactionObjectStore<'a> {
2376 Transaction::object_store(self, name)
2377 }
2378
2379 async fn commit(&mut self) -> Result<(), IndexedDBError> {
2380 Transaction::commit(self).await
2381 }
2382
2383 async fn abort(&mut self, reason: &str) -> Result<(), IndexedDBError> {
2384 Transaction::abort(self, reason).await
2385 }
2386}
2387
2388#[async_trait]
2389impl<'tx> TransactionObjectStoreApi for TransactionObjectStore<'tx> {
2390 type Index<'a>
2391 = TransactionIndex<'a>
2392 where
2393 Self: 'a;
2394
2395 async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
2396 TransactionObjectStore::get(self, id).await
2397 }
2398
2399 async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
2400 TransactionObjectStore::get_key(self, id).await
2401 }
2402
2403 async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
2404 TransactionObjectStore::add(self, record).await
2405 }
2406
2407 async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
2408 TransactionObjectStore::put(self, record).await
2409 }
2410
2411 async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
2412 TransactionObjectStore::delete(self, id).await
2413 }
2414
2415 async fn clear(&mut self) -> Result<(), IndexedDBError> {
2416 TransactionObjectStore::clear(self).await
2417 }
2418
2419 async fn get_all(&mut self, range: Option<KeyRange>) -> Result<Vec<Record>, IndexedDBError> {
2420 TransactionObjectStore::get_all(self, range).await
2421 }
2422
2423 async fn get_all_keys(
2424 &mut self,
2425 range: Option<KeyRange>,
2426 ) -> Result<Vec<String>, IndexedDBError> {
2427 TransactionObjectStore::get_all_keys(self, range).await
2428 }
2429
2430 async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
2431 TransactionObjectStore::count(self, range).await
2432 }
2433
2434 async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
2435 TransactionObjectStore::delete_range(self, range).await
2436 }
2437
2438 fn index<'a>(&'a mut self, name: &str) -> TransactionIndex<'a> {
2439 TransactionObjectStore::index(self, name)
2440 }
2441}
2442
2443#[async_trait]
2444impl TransactionIndexApi for TransactionIndex<'_> {
2445 async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
2446 TransactionIndex::get(self, values).await
2447 }
2448
2449 async fn get_key(&mut self, values: &[serde_json::Value]) -> Result<String, IndexedDBError> {
2450 TransactionIndex::get_key(self, values).await
2451 }
2452
2453 async fn get_all(
2454 &mut self,
2455 values: &[serde_json::Value],
2456 range: Option<KeyRange>,
2457 ) -> Result<Vec<Record>, IndexedDBError> {
2458 TransactionIndex::get_all(self, values, range).await
2459 }
2460
2461 async fn get_all_keys(
2462 &mut self,
2463 values: &[serde_json::Value],
2464 range: Option<KeyRange>,
2465 ) -> Result<Vec<String>, IndexedDBError> {
2466 TransactionIndex::get_all_keys(self, values, range).await
2467 }
2468
2469 async fn count(
2470 &mut self,
2471 values: &[serde_json::Value],
2472 range: Option<KeyRange>,
2473 ) -> Result<i64, IndexedDBError> {
2474 TransactionIndex::count(self, values, range).await
2475 }
2476
2477 async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
2478 TransactionIndex::delete(self, values).await
2479 }
2480
2481 async fn delete_range(
2482 &mut self,
2483 values: &[serde_json::Value],
2484 range: KeyRange,
2485 ) -> Result<i64, IndexedDBError> {
2486 TransactionIndex::delete_range(self, values, range).await
2487 }
2488}
2489
2490#[async_trait]
2491impl CursorApi for Cursor {
2492 fn key(&self) -> Option<serde_json::Value> {
2493 Cursor::key(self)
2494 }
2495
2496 fn primary_key(&self) -> &str {
2497 Cursor::primary_key(self)
2498 }
2499
2500 fn value(&self) -> Result<Record, IndexedDBError> {
2501 Cursor::value(self)
2502 }
2503
2504 async fn continue_next(&mut self) -> Result<bool, IndexedDBError> {
2505 Cursor::continue_next(self).await
2506 }
2507
2508 async fn continue_to_key(&mut self, key: serde_json::Value) -> Result<bool, IndexedDBError> {
2509 Cursor::continue_to_key(self, key).await
2510 }
2511
2512 async fn advance(&mut self, count: i32) -> Result<bool, IndexedDBError> {
2513 Cursor::advance(self, count).await
2514 }
2515
2516 async fn delete(&mut self) -> Result<(), IndexedDBError> {
2517 Cursor::delete(self).await
2518 }
2519
2520 async fn update(&mut self, value: Record) -> Result<(), IndexedDBError> {
2521 Cursor::update(self, value).await
2522 }
2523
2524 async fn close(self) -> Result<(), IndexedDBError> {
2525 Cursor::close(self).await
2526 }
2527}
2528
2529fn map_status(err: tonic::Status) -> IndexedDBError {
2530 match err.code() {
2531 tonic::Code::NotFound => IndexedDBError::NotFound,
2532 tonic::Code::AlreadyExists => IndexedDBError::AlreadyExists,
2533 tonic::Code::InvalidArgument => IndexedDBError::InvalidArgument(err.message().to_string()),
2534 tonic::Code::FailedPrecondition => IndexedDBError::Transaction(err.message().to_string()),
2535 _ => IndexedDBError::Status(err),
2536 }
2537}
2538
2539fn map_rpc_status(
2540 status: Option<crate::generated::google::rpc::Status>,
2541) -> Result<(), IndexedDBError> {
2542 let Some(status) = status else {
2543 return Ok(());
2544 };
2545 match status.code {
2546 0 => Ok(()),
2547 5 => Err(IndexedDBError::NotFound),
2548 6 => Err(IndexedDBError::AlreadyExists),
2549 3 => Err(IndexedDBError::InvalidArgument(status.message)),
2550 9 => Err(IndexedDBError::Transaction(status.message)),
2551 _ => Err(IndexedDBError::Transaction(status.message)),
2552 }
2553}
2554
2555fn unexpected_transaction_result() -> IndexedDBError {
2556 IndexedDBError::Transaction("unexpected transaction operation result".to_string())
2557}
2558
2559fn record_to_pb_record(record: Record) -> pb::Record {
2560 pb::Record {
2561 fields: record
2562 .into_iter()
2563 .map(|(k, v)| (k, json_to_typed_value(&v)))
2564 .collect(),
2565 }
2566}
2567
2568fn pb_record_to_record(r: &pb::Record) -> Record {
2569 r.fields
2570 .iter()
2571 .map(|(k, v)| (k.clone(), typed_value_to_json(v)))
2572 .collect()
2573}
2574
2575fn json_to_typed_value(v: &serde_json::Value) -> pb::TypedValue {
2576 use pb::typed_value::Kind;
2577 let kind = match v {
2578 serde_json::Value::Null => Kind::NullValue(0),
2579 serde_json::Value::Bool(b) => Kind::BoolValue(*b),
2580 serde_json::Value::Number(n) => {
2581 if let Some(i) = n.as_i64() {
2582 Kind::IntValue(i)
2583 } else {
2584 Kind::FloatValue(n.as_f64().unwrap_or(0.0))
2585 }
2586 }
2587 serde_json::Value::String(s) => Kind::StringValue(s.clone()),
2588 serde_json::Value::Array(arr) => {
2589 let values = arr.iter().map(json_to_prost_value).collect();
2590 Kind::JsonValue(prost_types::Value {
2591 kind: Some(prost_types::value::Kind::ListValue(
2592 prost_types::ListValue { values },
2593 )),
2594 })
2595 }
2596 serde_json::Value::Object(obj) => {
2597 let fields = obj
2598 .iter()
2599 .map(|(k, v)| (k.clone(), json_to_prost_value(v)))
2600 .collect();
2601 Kind::JsonValue(prost_types::Value {
2602 kind: Some(prost_types::value::Kind::StructValue(prost_types::Struct {
2603 fields,
2604 })),
2605 })
2606 }
2607 };
2608 pb::TypedValue { kind: Some(kind) }
2609}
2610
2611fn prost_value_to_json(v: &prost_types::Value) -> serde_json::Value {
2612 use prost_types::value::Kind;
2613 match &v.kind {
2614 Some(Kind::NullValue(_)) => serde_json::Value::Null,
2615 Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
2616 Some(Kind::NumberValue(n)) => serde_json::json!(*n),
2617 Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
2618 Some(Kind::ListValue(list)) => {
2619 serde_json::Value::Array(list.values.iter().map(prost_value_to_json).collect())
2620 }
2621 Some(Kind::StructValue(st)) => {
2622 let obj: serde_json::Map<String, serde_json::Value> = st
2623 .fields
2624 .iter()
2625 .map(|(k, v)| (k.clone(), prost_value_to_json(v)))
2626 .collect();
2627 serde_json::Value::Object(obj)
2628 }
2629 None => serde_json::Value::Null,
2630 }
2631}
2632
2633fn json_to_prost_value(v: &serde_json::Value) -> prost_types::Value {
2634 use prost_types::value::Kind;
2635 let kind = match v {
2636 serde_json::Value::Null => Kind::NullValue(0),
2637 serde_json::Value::Bool(b) => Kind::BoolValue(*b),
2638 serde_json::Value::Number(n) => Kind::NumberValue(n.as_f64().unwrap_or(0.0)),
2639 serde_json::Value::String(s) => Kind::StringValue(s.clone()),
2640 serde_json::Value::Array(arr) => {
2641 let values = arr.iter().map(json_to_prost_value).collect();
2642 Kind::ListValue(prost_types::ListValue { values })
2643 }
2644 serde_json::Value::Object(obj) => {
2645 let fields = obj
2646 .iter()
2647 .map(|(k, v)| (k.clone(), json_to_prost_value(v)))
2648 .collect();
2649 Kind::StructValue(prost_types::Struct { fields })
2650 }
2651 };
2652 prost_types::Value { kind: Some(kind) }
2653}
2654
2655fn key_value_to_json(kv: &pb::KeyValue) -> serde_json::Value {
2656 match &kv.kind {
2657 Some(pb::key_value::Kind::Scalar(tv)) => typed_value_to_json(tv),
2658 Some(pb::key_value::Kind::Array(arr)) => {
2659 serde_json::Value::Array(arr.elements.iter().map(key_value_to_json).collect())
2660 }
2661 None => serde_json::Value::Null,
2662 }
2663}
2664
2665fn json_to_key_value(v: &serde_json::Value) -> pb::KeyValue {
2666 if let serde_json::Value::Array(arr) = v {
2667 pb::KeyValue {
2668 kind: Some(pb::key_value::Kind::Array(pb::KeyValueArray {
2669 elements: arr.iter().map(json_to_key_value).collect(),
2670 })),
2671 }
2672 } else {
2673 pb::KeyValue {
2674 kind: Some(pb::key_value::Kind::Scalar(json_to_typed_value(v))),
2675 }
2676 }
2677}
2678
2679fn cursor_key_to_proto(key: &serde_json::Value, index_cursor: bool) -> Vec<pb::KeyValue> {
2680 if index_cursor {
2681 if let serde_json::Value::Array(parts) = key {
2682 return parts.iter().map(json_to_key_value).collect();
2683 }
2684 }
2685 vec![json_to_key_value(key)]
2686}
2687
2688fn typed_value_to_json(v: &pb::TypedValue) -> serde_json::Value {
2689 use pb::typed_value::Kind;
2690 match &v.kind {
2691 Some(Kind::NullValue(_)) => serde_json::Value::Null,
2692 Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
2693 Some(Kind::IntValue(i)) => serde_json::json!(*i),
2694 Some(Kind::FloatValue(f)) => serde_json::json!(*f),
2695 Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
2696 Some(Kind::BytesValue(b)) => serde_json::json!(b),
2697 Some(Kind::JsonValue(pv)) => prost_value_to_json(pv),
2698 Some(Kind::TimeValue(ts)) => {
2699 serde_json::Value::String(format!("{}.{}", ts.seconds, ts.nanos))
2700 }
2701 None => serde_json::Value::Null,
2702 }
2703}
2704
2705fn key_range_to_pb(kr: KeyRange) -> pb::KeyRange {
2706 pb::KeyRange {
2707 lower: kr.lower.map(|v| json_to_typed_value(&v)),
2708 upper: kr.upper.map(|v| json_to_typed_value(&v)),
2709 lower_open: kr.lower_open,
2710 upper_open: kr.upper_open,
2711 }
2712}
2713fn relay_token_interceptor(
2714 token: &str,
2715 binding: &str,
2716) -> Result<RelayTokenInterceptor, IndexedDBError> {
2717 let relay_token = if token.trim().is_empty() {
2718 None
2719 } else {
2720 Some(MetadataValue::try_from(token.to_string()).map_err(|err| {
2721 IndexedDBError::Env(format!("invalid IndexedDB relay token metadata: {err}"))
2722 })?)
2723 };
2724 let binding = if binding.trim().is_empty() {
2725 None
2726 } else {
2727 Some(
2728 MetadataValue::try_from(binding.trim().to_string()).map_err(|err| {
2729 IndexedDBError::Env(format!("invalid IndexedDB binding metadata: {err}"))
2730 })?,
2731 )
2732 };
2733 Ok(RelayTokenInterceptor {
2734 relay_token,
2735 binding,
2736 })
2737}
2738
2739#[derive(Clone)]
2740struct RelayTokenInterceptor {
2741 relay_token: Option<MetadataValue<tonic::metadata::Ascii>>,
2742 binding: Option<MetadataValue<tonic::metadata::Ascii>>,
2743}
2744
2745impl Interceptor for RelayTokenInterceptor {
2746 fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, tonic::Status> {
2747 if let Some(header) = self.relay_token.clone() {
2748 request
2749 .metadata_mut()
2750 .insert(INDEXEDDB_RELAY_TOKEN_HEADER, header);
2751 }
2752 if let Some(header) = self.binding.clone() {
2753 request
2754 .metadata_mut()
2755 .insert(HOST_SERVICE_BINDING_HEADER, header);
2756 }
2757 Ok(request)
2758 }
2759}