1use std::collections::BTreeMap;
2
3use hyper_util::rt::TokioIo;
4use tokio::sync::mpsc;
5use tokio_stream::wrappers::ReceiverStream;
6use tonic::Request;
7use tonic::metadata::MetadataValue;
8use tonic::service::Interceptor;
9use tonic::service::interceptor::InterceptedService;
10use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
11use tower::service_fn;
12
13use crate::generated::v1::{self as pb, indexed_db_client::IndexedDbClient};
14
15type IndexedDbTransport = InterceptedService<Channel, RelayTokenInterceptor>;
16
17pub const ENV_INDEXEDDB_SOCKET: &str = "GESTALT_INDEXEDDB_SOCKET";
19pub const ENV_INDEXEDDB_SOCKET_TOKEN_SUFFIX: &str = "_TOKEN";
21const INDEXEDDB_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
22
23const CURSOR_CHANNEL_BUFFER: usize = 1;
24const TRANSACTION_CHANNEL_BUFFER: usize = 1;
25
26#[derive(Debug, thiserror::Error)]
27pub enum IndexedDBError {
29 #[error("not found")]
31 NotFound,
32 #[error("already exists")]
34 AlreadyExists,
35 #[error("cursor is keys-only; value not available")]
37 KeysOnly,
38 #[error("{0}")]
40 Transaction(String),
41 #[error("{0}")]
43 Transport(#[from] tonic::transport::Error),
44 #[error("{0}")]
46 Status(#[from] tonic::Status),
47 #[error("{0}")]
49 Env(String),
50}
51
52pub type Record = BTreeMap<String, serde_json::Value>;
54
55pub struct KeyRange {
57 pub lower: Option<serde_json::Value>,
59 pub upper: Option<serde_json::Value>,
61 pub lower_open: bool,
63 pub upper_open: bool,
65}
66
67pub struct IndexSchema {
69 pub name: String,
71 pub key_path: Vec<String>,
73 pub unique: bool,
75}
76
77pub struct ObjectStoreSchema {
79 pub indexes: Vec<IndexSchema>,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84pub enum CursorDirection {
86 Next,
88 NextUnique,
90 Prev,
92 PrevUnique,
94}
95
96impl CursorDirection {
97 fn to_proto(self) -> i32 {
98 match self {
99 Self::Next => pb::CursorDirection::CursorNext as i32,
100 Self::NextUnique => pb::CursorDirection::CursorNextUnique as i32,
101 Self::Prev => pb::CursorDirection::CursorPrev as i32,
102 Self::PrevUnique => pb::CursorDirection::CursorPrevUnique as i32,
103 }
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108pub enum TransactionMode {
110 Readonly,
112 Readwrite,
114}
115
116impl TransactionMode {
117 fn to_proto(self) -> i32 {
118 match self {
119 Self::Readonly => pb::TransactionMode::TransactionReadonly as i32,
120 Self::Readwrite => pb::TransactionMode::TransactionReadwrite as i32,
121 }
122 }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
126pub enum TransactionDurabilityHint {
128 #[default]
130 Default,
131 Strict,
133 Relaxed,
135}
136
137impl TransactionDurabilityHint {
138 fn to_proto(self) -> i32 {
139 match self {
140 Self::Default => pb::TransactionDurabilityHint::TransactionDurabilityDefault as i32,
141 Self::Strict => pb::TransactionDurabilityHint::TransactionDurabilityStrict as i32,
142 Self::Relaxed => pb::TransactionDurabilityHint::TransactionDurabilityRelaxed as i32,
143 }
144 }
145}
146
147#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
148pub struct TransactionOptions {
150 pub durability_hint: TransactionDurabilityHint,
152}
153
154pub struct Cursor {
156 tx: mpsc::Sender<pb::CursorClientMessage>,
157 stream: tonic::Streaming<pb::CursorResponse>,
158 keys_only: bool,
159 index_cursor: bool,
160 entry: Option<pb::CursorEntry>,
161 done: bool,
162}
163
164impl Cursor {
165 pub fn key(&self) -> Option<serde_json::Value> {
167 let entry = self.entry.as_ref()?;
168 match entry.key.len() {
169 0 => None,
170 1 if !self.index_cursor => Some(key_value_to_json(&entry.key[0])),
171 _ => Some(serde_json::Value::Array(
172 entry.key.iter().map(key_value_to_json).collect(),
173 )),
174 }
175 }
176
177 pub fn primary_key(&self) -> &str {
179 self.entry
180 .as_ref()
181 .map(|e| e.primary_key.as_str())
182 .unwrap_or("")
183 }
184
185 pub fn value(&self) -> Result<Record, IndexedDBError> {
187 if self.keys_only {
188 return Err(IndexedDBError::KeysOnly);
189 }
190 let entry = self.entry.as_ref().ok_or(IndexedDBError::NotFound)?;
191 Ok(entry
192 .record
193 .as_ref()
194 .map(pb_record_to_record)
195 .unwrap_or_default())
196 }
197
198 pub async fn continue_next(&mut self) -> Result<bool, IndexedDBError> {
200 let cmd = pb::cursor_command::Command::Next(true);
201 self.send_and_recv(cmd).await
202 }
203
204 pub async fn continue_to_key(
206 &mut self,
207 key: serde_json::Value,
208 ) -> Result<bool, IndexedDBError> {
209 let cmd = pb::cursor_command::Command::ContinueToKey(pb::CursorKeyTarget {
210 key: cursor_key_to_proto(&key, self.index_cursor),
211 });
212 self.send_and_recv(cmd).await
213 }
214
215 pub async fn advance(&mut self, count: i32) -> Result<bool, IndexedDBError> {
217 let cmd = pb::cursor_command::Command::Advance(count);
218 self.send_and_recv(cmd).await
219 }
220
221 pub async fn delete(&mut self) -> Result<(), IndexedDBError> {
223 if self.done {
224 return Err(IndexedDBError::NotFound);
225 }
226 let cmd = pb::cursor_command::Command::Delete(true);
227 self.send_mutation(cmd).await
228 }
229
230 pub async fn update(&mut self, value: Record) -> Result<(), IndexedDBError> {
232 if self.done {
233 return Err(IndexedDBError::NotFound);
234 }
235 let cmd = pb::cursor_command::Command::Update(record_to_pb_record(value));
236 self.send_mutation(cmd).await
237 }
238
239 pub async fn close(self) -> Result<(), IndexedDBError> {
241 let msg = pb::CursorClientMessage {
242 msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
243 command: Some(pb::cursor_command::Command::Close(true)),
244 })),
245 };
246 self.tx
247 .send(msg)
248 .await
249 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
250 Ok(())
251 }
252
253 async fn send_mutation(
254 &mut self,
255 cmd: pb::cursor_command::Command,
256 ) -> Result<(), IndexedDBError> {
257 let msg = pb::CursorClientMessage {
258 msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
259 command: Some(cmd),
260 })),
261 };
262 self.tx
263 .send(msg)
264 .await
265 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
266 let resp = self
268 .stream
269 .message()
270 .await
271 .map_err(map_status)?
272 .ok_or_else(|| {
273 IndexedDBError::Status(tonic::Status::internal(
274 "cursor stream ended during mutation",
275 ))
276 })?;
277 match resp.result {
278 Some(pb::cursor_response::Result::Entry(entry)) => {
279 self.entry = Some(entry);
280 }
281 Some(pb::cursor_response::Result::Done(_)) => {}
282 None => {
283 return Err(IndexedDBError::Status(tonic::Status::internal(
284 "unexpected cursor mutation ack",
285 )));
286 }
287 }
288 Ok(())
289 }
290
291 async fn send_and_recv(
292 &mut self,
293 cmd: pb::cursor_command::Command,
294 ) -> Result<bool, IndexedDBError> {
295 if self.done {
296 return Ok(false);
297 }
298 let msg = pb::CursorClientMessage {
299 msg: Some(pb::cursor_client_message::Msg::Command(pb::CursorCommand {
300 command: Some(cmd),
301 })),
302 };
303 self.tx
304 .send(msg)
305 .await
306 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
307
308 let resp = self
309 .stream
310 .message()
311 .await
312 .map_err(map_status)?
313 .ok_or_else(|| {
314 IndexedDBError::Status(tonic::Status::internal("cursor stream ended"))
315 })?;
316
317 match resp.result {
318 Some(pb::cursor_response::Result::Entry(entry)) => {
319 self.entry = Some(entry);
320 self.done = false;
321 Ok(true)
322 }
323 Some(pb::cursor_response::Result::Done(exhausted)) => {
324 if exhausted {
325 self.done = true;
326 }
327 self.entry = None;
328 Ok(false)
329 }
330 None => {
331 self.entry = None;
332 self.done = true;
333 Ok(false)
334 }
335 }
336 }
337}
338
339async fn open_cursor_inner(
340 client: &mut IndexedDbClient<IndexedDbTransport>,
341 req: pb::OpenCursorRequest,
342) -> Result<Cursor, IndexedDBError> {
343 let keys_only = req.keys_only;
344 let is_index = !req.index.is_empty();
345 let (tx, rx) = mpsc::channel::<pb::CursorClientMessage>(CURSOR_CHANNEL_BUFFER);
346
347 let open_msg = pb::CursorClientMessage {
348 msg: Some(pb::cursor_client_message::Msg::Open(req)),
349 };
350 tx.send(open_msg)
351 .await
352 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
353
354 let receiver_stream = ReceiverStream::new(rx);
355 let mut stream = client
356 .open_cursor(receiver_stream)
357 .await
358 .map_err(map_status)?
359 .into_inner();
360
361 let ack = stream.message().await.map_err(map_status)?.ok_or_else(|| {
363 IndexedDBError::Status(tonic::Status::internal("cursor stream ended during open"))
364 })?;
365 match ack.result {
366 Some(pb::cursor_response::Result::Done(false)) => {}
367 Some(pb::cursor_response::Result::Done(true)) => {
368 return Err(IndexedDBError::Status(tonic::Status::internal(
369 "unexpected exhausted cursor open ack",
370 )));
371 }
372 _ => {
373 return Err(IndexedDBError::Status(tonic::Status::internal(
374 "unexpected cursor open ack",
375 )));
376 }
377 }
378
379 Ok(Cursor {
380 tx,
381 stream,
382 keys_only,
383 entry: None,
384 done: false,
385 index_cursor: is_index,
386 })
387}
388
389pub struct IndexedDB {
391 client: IndexedDbClient<IndexedDbTransport>,
392}
393
394impl IndexedDB {
395 pub async fn connect() -> Result<Self, IndexedDBError> {
397 Self::connect_named("").await
398 }
399
400 pub async fn connect_named(name: &str) -> Result<Self, IndexedDBError> {
402 let env_name = indexeddb_socket_env(name);
403 let target = std::env::var(&env_name)
404 .map_err(|_| IndexedDBError::Env(format!("{env_name} is not set")))?;
405 let token = std::env::var(indexeddb_socket_token_env(name)).unwrap_or_default();
406 let channel = match parse_indexeddb_target(&target)? {
407 IndexedDBTarget::Unix(path) => {
408 Endpoint::try_from("http://[::]:50051")?
409 .connect_with_connector(service_fn(move |_: Uri| {
410 let path = path.clone();
411 async move {
412 tokio::net::UnixStream::connect(path)
413 .await
414 .map(TokioIo::new)
415 }
416 }))
417 .await?
418 }
419 IndexedDBTarget::Tcp(address) => {
420 Endpoint::from_shared(format!("http://{address}"))?
421 .connect()
422 .await?
423 }
424 IndexedDBTarget::Tls(address) => {
425 Endpoint::from_shared(format!("https://{address}"))?
426 .tls_config(ClientTlsConfig::new().with_native_roots())?
427 .connect()
428 .await?
429 }
430 };
431
432 let client =
433 IndexedDbClient::with_interceptor(channel, relay_token_interceptor(token.trim())?);
434
435 Ok(Self { client })
436 }
437
438 pub async fn create_object_store(
440 &mut self,
441 name: &str,
442 schema: ObjectStoreSchema,
443 ) -> Result<(), IndexedDBError> {
444 let indexes = schema
445 .indexes
446 .into_iter()
447 .map(|idx| pb::IndexSchema {
448 name: idx.name,
449 key_path: idx.key_path,
450 unique: idx.unique,
451 })
452 .collect();
453 self.client
454 .create_object_store(pb::CreateObjectStoreRequest {
455 name: name.to_string(),
456 schema: Some(pb::ObjectStoreSchema {
457 indexes,
458 columns: vec![],
459 }),
460 })
461 .await
462 .map_err(map_status)?;
463 Ok(())
464 }
465
466 pub async fn delete_object_store(&mut self, name: &str) -> Result<(), IndexedDBError> {
468 self.client
469 .delete_object_store(pb::DeleteObjectStoreRequest {
470 name: name.to_string(),
471 })
472 .await
473 .map_err(map_status)?;
474 Ok(())
475 }
476
477 pub fn object_store(&self, name: &str) -> ObjectStore {
479 ObjectStore {
480 client: self.client.clone(),
481 store: name.to_string(),
482 }
483 }
484
485 pub async fn transaction(
487 &self,
488 stores: &[&str],
489 mode: TransactionMode,
490 options: TransactionOptions,
491 ) -> Result<Transaction, IndexedDBError> {
492 let (tx, rx) = mpsc::channel::<pb::TransactionClientMessage>(TRANSACTION_CHANNEL_BUFFER);
493 tx.send(pb::TransactionClientMessage {
494 msg: Some(pb::transaction_client_message::Msg::Begin(
495 pb::BeginTransactionRequest {
496 stores: stores.iter().map(|store| store.to_string()).collect(),
497 mode: mode.to_proto(),
498 durability_hint: options.durability_hint.to_proto(),
499 },
500 )),
501 })
502 .await
503 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
504
505 let receiver_stream = ReceiverStream::new(rx);
506 let mut client = self.client.clone();
507 let mut stream = client
508 .transaction(receiver_stream)
509 .await
510 .map_err(map_status)?
511 .into_inner();
512
513 let ack = stream.message().await.map_err(map_status)?.ok_or_else(|| {
514 IndexedDBError::Transaction("transaction stream ended during begin".to_string())
515 })?;
516 match ack.msg {
517 Some(pb::transaction_server_message::Msg::Begin(_)) => {}
518 _ => {
519 return Err(IndexedDBError::Transaction(
520 "expected transaction begin response".to_string(),
521 ));
522 }
523 }
524
525 Ok(Transaction {
526 tx: Some(tx),
527 stream,
528 request_id: 0,
529 closed: false,
530 })
531 }
532}
533
534pub struct Transaction {
536 tx: Option<mpsc::Sender<pb::TransactionClientMessage>>,
537 stream: tonic::Streaming<pb::TransactionServerMessage>,
538 request_id: u64,
539 closed: bool,
540}
541
542impl Transaction {
543 pub fn object_store<'a>(&'a mut self, name: &str) -> TransactionObjectStore<'a> {
545 TransactionObjectStore {
546 tx: self,
547 store: name.to_string(),
548 }
549 }
550
551 pub async fn commit(&mut self) -> Result<(), IndexedDBError> {
553 self.ensure_open()?;
554 let tx = self.tx.as_ref().ok_or_else(|| {
555 IndexedDBError::Transaction("transaction is already finished".to_string())
556 })?;
557 tx.send(pb::TransactionClientMessage {
558 msg: Some(pb::transaction_client_message::Msg::Commit(
559 pb::TransactionCommitRequest {},
560 )),
561 })
562 .await
563 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
564 self.closed = true;
565 self.tx.take();
566
567 let resp = self
568 .stream
569 .message()
570 .await
571 .map_err(map_status)?
572 .ok_or_else(|| {
573 IndexedDBError::Transaction("transaction stream ended during commit".to_string())
574 })?;
575 match resp.msg {
576 Some(pb::transaction_server_message::Msg::Commit(commit)) => {
577 map_rpc_status(commit.error)
578 }
579 _ => Err(IndexedDBError::Transaction(
580 "expected transaction commit response".to_string(),
581 )),
582 }
583 }
584
585 pub async fn abort(&mut self, reason: &str) -> Result<(), IndexedDBError> {
587 if self.closed {
588 return Ok(());
589 }
590 let tx = self.tx.as_ref().ok_or_else(|| {
591 IndexedDBError::Transaction("transaction is already finished".to_string())
592 })?;
593 tx.send(pb::TransactionClientMessage {
594 msg: Some(pb::transaction_client_message::Msg::Abort(
595 pb::TransactionAbortRequest {
596 reason: reason.to_string(),
597 },
598 )),
599 })
600 .await
601 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
602 self.closed = true;
603 self.tx.take();
604
605 let resp = self
606 .stream
607 .message()
608 .await
609 .map_err(map_status)?
610 .ok_or_else(|| {
611 IndexedDBError::Transaction("transaction stream ended during abort".to_string())
612 })?;
613 match resp.msg {
614 Some(pb::transaction_server_message::Msg::Abort(abort)) => map_rpc_status(abort.error),
615 _ => Err(IndexedDBError::Transaction(
616 "expected transaction abort response".to_string(),
617 )),
618 }
619 }
620
621 async fn send_operation(
622 &mut self,
623 operation: pb::transaction_operation::Operation,
624 ) -> Result<pb::TransactionOperationResponse, IndexedDBError> {
625 self.ensure_open()?;
626 self.request_id += 1;
627 let request_id = self.request_id;
628 let tx = self.tx.as_ref().ok_or_else(|| {
629 IndexedDBError::Transaction("transaction is already finished".to_string())
630 })?;
631 tx.send(pb::TransactionClientMessage {
632 msg: Some(pb::transaction_client_message::Msg::Operation(
633 pb::TransactionOperation {
634 request_id,
635 operation: Some(operation),
636 },
637 )),
638 })
639 .await
640 .map_err(|e| IndexedDBError::Status(tonic::Status::internal(e.to_string())))?;
641
642 let resp = self
643 .stream
644 .message()
645 .await
646 .map_err(map_status)?
647 .ok_or_else(|| {
648 IndexedDBError::Transaction("transaction stream ended during operation".to_string())
649 })?;
650 let op = match resp.msg {
651 Some(pb::transaction_server_message::Msg::Operation(op)) => op,
652 _ => {
653 self.close_locally();
654 return Err(IndexedDBError::Transaction(
655 "expected transaction operation response".to_string(),
656 ));
657 }
658 };
659 if op.request_id != request_id {
660 self.close_locally();
661 return Err(IndexedDBError::Transaction(
662 "transaction response request id mismatch".to_string(),
663 ));
664 }
665 if let Err(err) = map_rpc_status(op.error.clone()) {
666 self.close_locally();
667 return Err(err);
668 }
669 Ok(op)
670 }
671
672 fn ensure_open(&self) -> Result<(), IndexedDBError> {
673 if self.closed {
674 return Err(IndexedDBError::Transaction(
675 "transaction is already finished".to_string(),
676 ));
677 }
678 Ok(())
679 }
680
681 fn close_locally(&mut self) {
682 self.closed = true;
683 self.tx.take();
684 }
685}
686
687pub struct TransactionObjectStore<'a> {
689 tx: &'a mut Transaction,
690 store: String,
691}
692
693impl TransactionObjectStore<'_> {
694 pub async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
696 let resp = self
697 .tx
698 .send_operation(pb::transaction_operation::Operation::Get(
699 pb::ObjectStoreRequest {
700 store: self.store.clone(),
701 id: id.to_string(),
702 },
703 ))
704 .await?;
705 match resp.result {
706 Some(pb::transaction_operation_response::Result::Record(record)) => Ok(record
707 .record
708 .as_ref()
709 .map(pb_record_to_record)
710 .unwrap_or_default()),
711 _ => Err(unexpected_transaction_result()),
712 }
713 }
714
715 pub async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
717 let resp = self
718 .tx
719 .send_operation(pb::transaction_operation::Operation::GetKey(
720 pb::ObjectStoreRequest {
721 store: self.store.clone(),
722 id: id.to_string(),
723 },
724 ))
725 .await?;
726 match resp.result {
727 Some(pb::transaction_operation_response::Result::Key(key)) => Ok(key.key),
728 _ => Err(unexpected_transaction_result()),
729 }
730 }
731
732 pub async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
734 self.tx
735 .send_operation(pb::transaction_operation::Operation::Add(
736 pb::RecordRequest {
737 store: self.store.clone(),
738 record: Some(record_to_pb_record(record)),
739 },
740 ))
741 .await?;
742 Ok(())
743 }
744
745 pub async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
747 self.tx
748 .send_operation(pb::transaction_operation::Operation::Put(
749 pb::RecordRequest {
750 store: self.store.clone(),
751 record: Some(record_to_pb_record(record)),
752 },
753 ))
754 .await?;
755 Ok(())
756 }
757
758 pub async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
760 self.tx
761 .send_operation(pb::transaction_operation::Operation::Delete(
762 pb::ObjectStoreRequest {
763 store: self.store.clone(),
764 id: id.to_string(),
765 },
766 ))
767 .await?;
768 Ok(())
769 }
770
771 pub async fn clear(&mut self) -> Result<(), IndexedDBError> {
773 self.tx
774 .send_operation(pb::transaction_operation::Operation::Clear(
775 pb::ObjectStoreNameRequest {
776 store: self.store.clone(),
777 },
778 ))
779 .await?;
780 Ok(())
781 }
782
783 pub async fn get_all(
785 &mut self,
786 range: Option<KeyRange>,
787 ) -> Result<Vec<Record>, IndexedDBError> {
788 let resp = self
789 .tx
790 .send_operation(pb::transaction_operation::Operation::GetAll(
791 pb::ObjectStoreRangeRequest {
792 store: self.store.clone(),
793 range: range.map(key_range_to_pb),
794 },
795 ))
796 .await?;
797 match resp.result {
798 Some(pb::transaction_operation_response::Result::Records(records)) => {
799 Ok(records.records.iter().map(pb_record_to_record).collect())
800 }
801 _ => Err(unexpected_transaction_result()),
802 }
803 }
804
805 pub async fn get_all_keys(
807 &mut self,
808 range: Option<KeyRange>,
809 ) -> Result<Vec<String>, IndexedDBError> {
810 let resp = self
811 .tx
812 .send_operation(pb::transaction_operation::Operation::GetAllKeys(
813 pb::ObjectStoreRangeRequest {
814 store: self.store.clone(),
815 range: range.map(key_range_to_pb),
816 },
817 ))
818 .await?;
819 match resp.result {
820 Some(pb::transaction_operation_response::Result::Keys(keys)) => Ok(keys.keys),
821 _ => Err(unexpected_transaction_result()),
822 }
823 }
824
825 pub async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
827 let resp = self
828 .tx
829 .send_operation(pb::transaction_operation::Operation::Count(
830 pb::ObjectStoreRangeRequest {
831 store: self.store.clone(),
832 range: range.map(key_range_to_pb),
833 },
834 ))
835 .await?;
836 match resp.result {
837 Some(pb::transaction_operation_response::Result::Count(count)) => Ok(count.count),
838 _ => Err(unexpected_transaction_result()),
839 }
840 }
841
842 pub async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
844 let resp = self
845 .tx
846 .send_operation(pb::transaction_operation::Operation::DeleteRange(
847 pb::ObjectStoreRangeRequest {
848 store: self.store.clone(),
849 range: Some(key_range_to_pb(range)),
850 },
851 ))
852 .await?;
853 match resp.result {
854 Some(pb::transaction_operation_response::Result::Delete(deleted)) => {
855 Ok(deleted.deleted)
856 }
857 _ => Err(unexpected_transaction_result()),
858 }
859 }
860
861 pub fn index<'a>(&'a mut self, name: &str) -> TransactionIndexClient<'a> {
863 TransactionIndexClient {
864 tx: &mut *self.tx,
865 store: self.store.clone(),
866 index: name.to_string(),
867 }
868 }
869}
870
871pub struct TransactionIndexClient<'a> {
873 tx: &'a mut Transaction,
874 store: String,
875 index: String,
876}
877
878impl TransactionIndexClient<'_> {
879 pub async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
881 let resp = self
882 .tx
883 .send_operation(pb::transaction_operation::Operation::IndexGet(
884 self.index_request(values, None),
885 ))
886 .await?;
887 match resp.result {
888 Some(pb::transaction_operation_response::Result::Record(record)) => Ok(record
889 .record
890 .as_ref()
891 .map(pb_record_to_record)
892 .unwrap_or_default()),
893 _ => Err(unexpected_transaction_result()),
894 }
895 }
896
897 pub async fn get_key(
899 &mut self,
900 values: &[serde_json::Value],
901 ) -> Result<String, IndexedDBError> {
902 let resp = self
903 .tx
904 .send_operation(pb::transaction_operation::Operation::IndexGetKey(
905 self.index_request(values, None),
906 ))
907 .await?;
908 match resp.result {
909 Some(pb::transaction_operation_response::Result::Key(key)) => Ok(key.key),
910 _ => Err(unexpected_transaction_result()),
911 }
912 }
913
914 pub async fn get_all(
916 &mut self,
917 values: &[serde_json::Value],
918 range: Option<KeyRange>,
919 ) -> Result<Vec<Record>, IndexedDBError> {
920 let resp = self
921 .tx
922 .send_operation(pb::transaction_operation::Operation::IndexGetAll(
923 self.index_request(values, range),
924 ))
925 .await?;
926 match resp.result {
927 Some(pb::transaction_operation_response::Result::Records(records)) => {
928 Ok(records.records.iter().map(pb_record_to_record).collect())
929 }
930 _ => Err(unexpected_transaction_result()),
931 }
932 }
933
934 pub async fn get_all_keys(
936 &mut self,
937 values: &[serde_json::Value],
938 range: Option<KeyRange>,
939 ) -> Result<Vec<String>, IndexedDBError> {
940 let resp = self
941 .tx
942 .send_operation(pb::transaction_operation::Operation::IndexGetAllKeys(
943 self.index_request(values, range),
944 ))
945 .await?;
946 match resp.result {
947 Some(pb::transaction_operation_response::Result::Keys(keys)) => Ok(keys.keys),
948 _ => Err(unexpected_transaction_result()),
949 }
950 }
951
952 pub async fn count(
954 &mut self,
955 values: &[serde_json::Value],
956 range: Option<KeyRange>,
957 ) -> Result<i64, IndexedDBError> {
958 let resp = self
959 .tx
960 .send_operation(pb::transaction_operation::Operation::IndexCount(
961 self.index_request(values, range),
962 ))
963 .await?;
964 match resp.result {
965 Some(pb::transaction_operation_response::Result::Count(count)) => Ok(count.count),
966 _ => Err(unexpected_transaction_result()),
967 }
968 }
969
970 pub async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
972 let resp = self
973 .tx
974 .send_operation(pb::transaction_operation::Operation::IndexDelete(
975 self.index_request(values, None),
976 ))
977 .await?;
978 match resp.result {
979 Some(pb::transaction_operation_response::Result::Delete(deleted)) => {
980 Ok(deleted.deleted)
981 }
982 _ => Err(unexpected_transaction_result()),
983 }
984 }
985
986 fn index_request(
987 &self,
988 values: &[serde_json::Value],
989 range: Option<KeyRange>,
990 ) -> pb::IndexQueryRequest {
991 pb::IndexQueryRequest {
992 store: self.store.clone(),
993 index: self.index.clone(),
994 values: values.iter().map(json_to_typed_value).collect(),
995 range: range.map(key_range_to_pb),
996 }
997 }
998}
999
1000enum IndexedDBTarget {
1001 Unix(String),
1002 Tcp(String),
1003 Tls(String),
1004}
1005
1006fn parse_indexeddb_target(raw_target: &str) -> Result<IndexedDBTarget, IndexedDBError> {
1007 let target = raw_target.trim();
1008 if target.is_empty() {
1009 return Err(IndexedDBError::Env(
1010 "IndexedDB transport target is required".to_string(),
1011 ));
1012 }
1013 if let Some(address) = target.strip_prefix("tcp://") {
1014 let address = address.trim();
1015 if address.is_empty() {
1016 return Err(IndexedDBError::Env(format!(
1017 "IndexedDB tcp target {raw_target:?} is missing host:port"
1018 )));
1019 }
1020 return Ok(IndexedDBTarget::Tcp(address.to_string()));
1021 }
1022 if let Some(address) = target.strip_prefix("tls://") {
1023 let address = address.trim();
1024 if address.is_empty() {
1025 return Err(IndexedDBError::Env(format!(
1026 "IndexedDB tls target {raw_target:?} is missing host:port"
1027 )));
1028 }
1029 return Ok(IndexedDBTarget::Tls(address.to_string()));
1030 }
1031 if let Some(path) = target.strip_prefix("unix://") {
1032 let path = path.trim();
1033 if path.is_empty() {
1034 return Err(IndexedDBError::Env(format!(
1035 "IndexedDB unix target {raw_target:?} is missing a socket path"
1036 )));
1037 }
1038 return Ok(IndexedDBTarget::Unix(path.to_string()));
1039 }
1040 if target.contains("://") {
1041 let scheme = target.split("://").next().unwrap_or_default();
1042 return Err(IndexedDBError::Env(format!(
1043 "unsupported IndexedDB target scheme {scheme:?}"
1044 )));
1045 }
1046 Ok(IndexedDBTarget::Unix(target.to_string()))
1047}
1048
1049pub struct ObjectStore {
1051 client: IndexedDbClient<IndexedDbTransport>,
1052 store: String,
1053}
1054
1055impl ObjectStore {
1056 pub async fn get(&mut self, id: &str) -> Result<Record, IndexedDBError> {
1058 let resp = self
1059 .client
1060 .get(pb::ObjectStoreRequest {
1061 store: self.store.clone(),
1062 id: id.to_string(),
1063 })
1064 .await
1065 .map_err(map_status)?;
1066 Ok(resp
1067 .into_inner()
1068 .record
1069 .as_ref()
1070 .map(pb_record_to_record)
1071 .unwrap_or_default())
1072 }
1073
1074 pub async fn get_key(&mut self, id: &str) -> Result<String, IndexedDBError> {
1076 let resp = self
1077 .client
1078 .get_key(pb::ObjectStoreRequest {
1079 store: self.store.clone(),
1080 id: id.to_string(),
1081 })
1082 .await
1083 .map_err(map_status)?;
1084 Ok(resp.into_inner().key)
1085 }
1086
1087 pub async fn add(&mut self, record: Record) -> Result<(), IndexedDBError> {
1089 self.client
1090 .add(pb::RecordRequest {
1091 store: self.store.clone(),
1092 record: Some(record_to_pb_record(record)),
1093 })
1094 .await
1095 .map_err(map_status)?;
1096 Ok(())
1097 }
1098
1099 pub async fn put(&mut self, record: Record) -> Result<(), IndexedDBError> {
1101 self.client
1102 .put(pb::RecordRequest {
1103 store: self.store.clone(),
1104 record: Some(record_to_pb_record(record)),
1105 })
1106 .await
1107 .map_err(map_status)?;
1108 Ok(())
1109 }
1110
1111 pub async fn delete(&mut self, id: &str) -> Result<(), IndexedDBError> {
1113 self.client
1114 .delete(pb::ObjectStoreRequest {
1115 store: self.store.clone(),
1116 id: id.to_string(),
1117 })
1118 .await
1119 .map_err(map_status)?;
1120 Ok(())
1121 }
1122
1123 pub async fn clear(&mut self) -> Result<(), IndexedDBError> {
1125 self.client
1126 .clear(pb::ObjectStoreNameRequest {
1127 store: self.store.clone(),
1128 })
1129 .await
1130 .map_err(map_status)?;
1131 Ok(())
1132 }
1133
1134 pub async fn get_all(
1136 &mut self,
1137 range: Option<KeyRange>,
1138 ) -> Result<Vec<Record>, IndexedDBError> {
1139 let resp = self
1140 .client
1141 .get_all(pb::ObjectStoreRangeRequest {
1142 store: self.store.clone(),
1143 range: range.map(key_range_to_pb),
1144 })
1145 .await
1146 .map_err(map_status)?;
1147 Ok(resp
1148 .into_inner()
1149 .records
1150 .iter()
1151 .map(pb_record_to_record)
1152 .collect())
1153 }
1154
1155 pub async fn get_all_keys(
1157 &mut self,
1158 range: Option<KeyRange>,
1159 ) -> Result<Vec<String>, IndexedDBError> {
1160 let resp = self
1161 .client
1162 .get_all_keys(pb::ObjectStoreRangeRequest {
1163 store: self.store.clone(),
1164 range: range.map(key_range_to_pb),
1165 })
1166 .await
1167 .map_err(map_status)?;
1168 Ok(resp.into_inner().keys)
1169 }
1170
1171 pub async fn count(&mut self, range: Option<KeyRange>) -> Result<i64, IndexedDBError> {
1173 let resp = self
1174 .client
1175 .count(pb::ObjectStoreRangeRequest {
1176 store: self.store.clone(),
1177 range: range.map(key_range_to_pb),
1178 })
1179 .await
1180 .map_err(map_status)?;
1181 Ok(resp.into_inner().count)
1182 }
1183
1184 pub async fn delete_range(&mut self, range: KeyRange) -> Result<i64, IndexedDBError> {
1186 let resp = self
1187 .client
1188 .delete_range(pb::ObjectStoreRangeRequest {
1189 store: self.store.clone(),
1190 range: Some(key_range_to_pb(range)),
1191 })
1192 .await
1193 .map_err(map_status)?;
1194 Ok(resp.into_inner().deleted)
1195 }
1196
1197 pub fn index(&self, name: &str) -> IndexClient {
1199 IndexClient {
1200 client: self.client.clone(),
1201 store: self.store.clone(),
1202 index: name.to_string(),
1203 }
1204 }
1205
1206 pub async fn open_cursor(
1208 &mut self,
1209 range: Option<KeyRange>,
1210 direction: CursorDirection,
1211 ) -> Result<Cursor, IndexedDBError> {
1212 let req = pb::OpenCursorRequest {
1213 store: self.store.clone(),
1214 range: range.map(key_range_to_pb),
1215 direction: direction.to_proto(),
1216 keys_only: false,
1217 index: String::new(),
1218 values: vec![],
1219 };
1220 open_cursor_inner(&mut self.client, req).await
1221 }
1222
1223 pub async fn open_key_cursor(
1225 &mut self,
1226 range: Option<KeyRange>,
1227 direction: CursorDirection,
1228 ) -> Result<Cursor, IndexedDBError> {
1229 let req = pb::OpenCursorRequest {
1230 store: self.store.clone(),
1231 range: range.map(key_range_to_pb),
1232 direction: direction.to_proto(),
1233 keys_only: true,
1234 index: String::new(),
1235 values: vec![],
1236 };
1237 open_cursor_inner(&mut self.client, req).await
1238 }
1239}
1240
1241pub struct IndexClient {
1243 client: IndexedDbClient<IndexedDbTransport>,
1244 store: String,
1245 index: String,
1246}
1247
1248impl IndexClient {
1249 pub async fn get(&mut self, values: &[serde_json::Value]) -> Result<Record, IndexedDBError> {
1251 let resp = self
1252 .client
1253 .index_get(pb::IndexQueryRequest {
1254 store: self.store.clone(),
1255 index: self.index.clone(),
1256 values: values.iter().map(json_to_typed_value).collect(),
1257 range: None,
1258 })
1259 .await
1260 .map_err(map_status)?;
1261 Ok(resp
1262 .into_inner()
1263 .record
1264 .as_ref()
1265 .map(pb_record_to_record)
1266 .unwrap_or_default())
1267 }
1268
1269 pub async fn get_key(
1271 &mut self,
1272 values: &[serde_json::Value],
1273 ) -> Result<String, IndexedDBError> {
1274 let resp = self
1275 .client
1276 .index_get_key(pb::IndexQueryRequest {
1277 store: self.store.clone(),
1278 index: self.index.clone(),
1279 values: values.iter().map(json_to_typed_value).collect(),
1280 range: None,
1281 })
1282 .await
1283 .map_err(map_status)?;
1284 Ok(resp.into_inner().key)
1285 }
1286
1287 pub async fn get_all(
1289 &mut self,
1290 values: &[serde_json::Value],
1291 range: Option<KeyRange>,
1292 ) -> Result<Vec<Record>, IndexedDBError> {
1293 let resp = self
1294 .client
1295 .index_get_all(pb::IndexQueryRequest {
1296 store: self.store.clone(),
1297 index: self.index.clone(),
1298 values: values.iter().map(json_to_typed_value).collect(),
1299 range: range.map(key_range_to_pb),
1300 })
1301 .await
1302 .map_err(map_status)?;
1303 Ok(resp
1304 .into_inner()
1305 .records
1306 .iter()
1307 .map(pb_record_to_record)
1308 .collect())
1309 }
1310
1311 pub async fn get_all_keys(
1313 &mut self,
1314 values: &[serde_json::Value],
1315 range: Option<KeyRange>,
1316 ) -> Result<Vec<String>, IndexedDBError> {
1317 let resp = self
1318 .client
1319 .index_get_all_keys(pb::IndexQueryRequest {
1320 store: self.store.clone(),
1321 index: self.index.clone(),
1322 values: values.iter().map(json_to_typed_value).collect(),
1323 range: range.map(key_range_to_pb),
1324 })
1325 .await
1326 .map_err(map_status)?;
1327 Ok(resp.into_inner().keys)
1328 }
1329
1330 pub async fn count(
1332 &mut self,
1333 values: &[serde_json::Value],
1334 range: Option<KeyRange>,
1335 ) -> Result<i64, IndexedDBError> {
1336 let resp = self
1337 .client
1338 .index_count(pb::IndexQueryRequest {
1339 store: self.store.clone(),
1340 index: self.index.clone(),
1341 values: values.iter().map(json_to_typed_value).collect(),
1342 range: range.map(key_range_to_pb),
1343 })
1344 .await
1345 .map_err(map_status)?;
1346 Ok(resp.into_inner().count)
1347 }
1348
1349 pub async fn delete(&mut self, values: &[serde_json::Value]) -> Result<i64, IndexedDBError> {
1351 let resp = self
1352 .client
1353 .index_delete(pb::IndexQueryRequest {
1354 store: self.store.clone(),
1355 index: self.index.clone(),
1356 values: values.iter().map(json_to_typed_value).collect(),
1357 range: None,
1358 })
1359 .await
1360 .map_err(map_status)?;
1361 Ok(resp.into_inner().deleted)
1362 }
1363
1364 pub async fn open_cursor(
1366 &mut self,
1367 values: &[serde_json::Value],
1368 range: Option<KeyRange>,
1369 direction: CursorDirection,
1370 ) -> Result<Cursor, IndexedDBError> {
1371 let req = pb::OpenCursorRequest {
1372 store: self.store.clone(),
1373 range: range.map(key_range_to_pb),
1374 direction: direction.to_proto(),
1375 keys_only: false,
1376 index: self.index.clone(),
1377 values: values.iter().map(json_to_typed_value).collect(),
1378 };
1379 open_cursor_inner(&mut self.client, req).await
1380 }
1381
1382 pub async fn open_key_cursor(
1384 &mut self,
1385 values: &[serde_json::Value],
1386 range: Option<KeyRange>,
1387 direction: CursorDirection,
1388 ) -> Result<Cursor, IndexedDBError> {
1389 let req = pb::OpenCursorRequest {
1390 store: self.store.clone(),
1391 range: range.map(key_range_to_pb),
1392 direction: direction.to_proto(),
1393 keys_only: true,
1394 index: self.index.clone(),
1395 values: values.iter().map(json_to_typed_value).collect(),
1396 };
1397 open_cursor_inner(&mut self.client, req).await
1398 }
1399}
1400
1401fn map_status(err: tonic::Status) -> IndexedDBError {
1402 match err.code() {
1403 tonic::Code::NotFound => IndexedDBError::NotFound,
1404 tonic::Code::AlreadyExists => IndexedDBError::AlreadyExists,
1405 _ => IndexedDBError::Status(err),
1406 }
1407}
1408
1409fn map_rpc_status(
1410 status: Option<crate::generated::google::rpc::Status>,
1411) -> Result<(), IndexedDBError> {
1412 let Some(status) = status else {
1413 return Ok(());
1414 };
1415 match status.code {
1416 0 => Ok(()),
1417 5 => Err(IndexedDBError::NotFound),
1418 6 => Err(IndexedDBError::AlreadyExists),
1419 3 | 9 => Err(IndexedDBError::Transaction(status.message)),
1420 _ => Err(IndexedDBError::Transaction(status.message)),
1421 }
1422}
1423
1424fn unexpected_transaction_result() -> IndexedDBError {
1425 IndexedDBError::Transaction("unexpected transaction operation result".to_string())
1426}
1427
1428fn record_to_pb_record(record: Record) -> pb::Record {
1429 pb::Record {
1430 fields: record
1431 .into_iter()
1432 .map(|(k, v)| (k, json_to_typed_value(&v)))
1433 .collect(),
1434 }
1435}
1436
1437fn pb_record_to_record(r: &pb::Record) -> Record {
1438 r.fields
1439 .iter()
1440 .map(|(k, v)| (k.clone(), typed_value_to_json(v)))
1441 .collect()
1442}
1443
1444fn json_to_typed_value(v: &serde_json::Value) -> pb::TypedValue {
1445 use pb::typed_value::Kind;
1446 let kind = match v {
1447 serde_json::Value::Null => Kind::NullValue(0),
1448 serde_json::Value::Bool(b) => Kind::BoolValue(*b),
1449 serde_json::Value::Number(n) => {
1450 if let Some(i) = n.as_i64() {
1451 Kind::IntValue(i)
1452 } else {
1453 Kind::FloatValue(n.as_f64().unwrap_or(0.0))
1454 }
1455 }
1456 serde_json::Value::String(s) => Kind::StringValue(s.clone()),
1457 serde_json::Value::Array(arr) => {
1458 let values = arr.iter().map(json_to_prost_value).collect();
1459 Kind::JsonValue(prost_types::Value {
1460 kind: Some(prost_types::value::Kind::ListValue(
1461 prost_types::ListValue { values },
1462 )),
1463 })
1464 }
1465 serde_json::Value::Object(obj) => {
1466 let fields = obj
1467 .iter()
1468 .map(|(k, v)| (k.clone(), json_to_prost_value(v)))
1469 .collect();
1470 Kind::JsonValue(prost_types::Value {
1471 kind: Some(prost_types::value::Kind::StructValue(prost_types::Struct {
1472 fields,
1473 })),
1474 })
1475 }
1476 };
1477 pb::TypedValue { kind: Some(kind) }
1478}
1479
1480fn prost_value_to_json(v: &prost_types::Value) -> serde_json::Value {
1481 use prost_types::value::Kind;
1482 match &v.kind {
1483 Some(Kind::NullValue(_)) => serde_json::Value::Null,
1484 Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
1485 Some(Kind::NumberValue(n)) => serde_json::json!(*n),
1486 Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
1487 Some(Kind::ListValue(list)) => {
1488 serde_json::Value::Array(list.values.iter().map(prost_value_to_json).collect())
1489 }
1490 Some(Kind::StructValue(st)) => {
1491 let obj: serde_json::Map<String, serde_json::Value> = st
1492 .fields
1493 .iter()
1494 .map(|(k, v)| (k.clone(), prost_value_to_json(v)))
1495 .collect();
1496 serde_json::Value::Object(obj)
1497 }
1498 None => serde_json::Value::Null,
1499 }
1500}
1501
1502fn json_to_prost_value(v: &serde_json::Value) -> prost_types::Value {
1503 use prost_types::value::Kind;
1504 let kind = match v {
1505 serde_json::Value::Null => Kind::NullValue(0),
1506 serde_json::Value::Bool(b) => Kind::BoolValue(*b),
1507 serde_json::Value::Number(n) => Kind::NumberValue(n.as_f64().unwrap_or(0.0)),
1508 serde_json::Value::String(s) => Kind::StringValue(s.clone()),
1509 serde_json::Value::Array(arr) => {
1510 let values = arr.iter().map(json_to_prost_value).collect();
1511 Kind::ListValue(prost_types::ListValue { values })
1512 }
1513 serde_json::Value::Object(obj) => {
1514 let fields = obj
1515 .iter()
1516 .map(|(k, v)| (k.clone(), json_to_prost_value(v)))
1517 .collect();
1518 Kind::StructValue(prost_types::Struct { fields })
1519 }
1520 };
1521 prost_types::Value { kind: Some(kind) }
1522}
1523
1524fn key_value_to_json(kv: &pb::KeyValue) -> serde_json::Value {
1525 match &kv.kind {
1526 Some(pb::key_value::Kind::Scalar(tv)) => typed_value_to_json(tv),
1527 Some(pb::key_value::Kind::Array(arr)) => {
1528 serde_json::Value::Array(arr.elements.iter().map(key_value_to_json).collect())
1529 }
1530 None => serde_json::Value::Null,
1531 }
1532}
1533
1534fn json_to_key_value(v: &serde_json::Value) -> pb::KeyValue {
1535 if let serde_json::Value::Array(arr) = v {
1536 pb::KeyValue {
1537 kind: Some(pb::key_value::Kind::Array(pb::KeyValueArray {
1538 elements: arr.iter().map(json_to_key_value).collect(),
1539 })),
1540 }
1541 } else {
1542 pb::KeyValue {
1543 kind: Some(pb::key_value::Kind::Scalar(json_to_typed_value(v))),
1544 }
1545 }
1546}
1547
1548fn cursor_key_to_proto(key: &serde_json::Value, index_cursor: bool) -> Vec<pb::KeyValue> {
1549 if index_cursor {
1550 if let serde_json::Value::Array(parts) = key {
1551 return parts.iter().map(json_to_key_value).collect();
1552 }
1553 }
1554 vec![json_to_key_value(key)]
1555}
1556
1557fn typed_value_to_json(v: &pb::TypedValue) -> serde_json::Value {
1558 use pb::typed_value::Kind;
1559 match &v.kind {
1560 Some(Kind::NullValue(_)) => serde_json::Value::Null,
1561 Some(Kind::BoolValue(b)) => serde_json::Value::Bool(*b),
1562 Some(Kind::IntValue(i)) => serde_json::json!(*i),
1563 Some(Kind::FloatValue(f)) => serde_json::json!(*f),
1564 Some(Kind::StringValue(s)) => serde_json::Value::String(s.clone()),
1565 Some(Kind::BytesValue(b)) => serde_json::json!(b),
1566 Some(Kind::JsonValue(pv)) => prost_value_to_json(pv),
1567 Some(Kind::TimeValue(ts)) => {
1568 serde_json::Value::String(format!("{}.{}", ts.seconds, ts.nanos))
1569 }
1570 None => serde_json::Value::Null,
1571 }
1572}
1573
1574fn key_range_to_pb(kr: KeyRange) -> pb::KeyRange {
1575 pb::KeyRange {
1576 lower: kr.lower.map(|v| json_to_typed_value(&v)),
1577 upper: kr.upper.map(|v| json_to_typed_value(&v)),
1578 lower_open: kr.lower_open,
1579 upper_open: kr.upper_open,
1580 }
1581}
1582pub fn indexeddb_socket_env(name: &str) -> String {
1584 let trimmed = name.trim();
1585 if trimmed.is_empty() {
1586 return ENV_INDEXEDDB_SOCKET.to_string();
1587 }
1588 let mut env = String::from(ENV_INDEXEDDB_SOCKET);
1589 env.push('_');
1590 for ch in trimmed.chars() {
1591 if ch.is_ascii_alphanumeric() {
1592 env.push(ch.to_ascii_uppercase());
1593 } else {
1594 env.push('_');
1595 }
1596 }
1597 env
1598}
1599
1600pub fn indexeddb_socket_token_env(name: &str) -> String {
1602 format!(
1603 "{}{}",
1604 indexeddb_socket_env(name),
1605 ENV_INDEXEDDB_SOCKET_TOKEN_SUFFIX
1606 )
1607}
1608
1609fn relay_token_interceptor(token: &str) -> Result<RelayTokenInterceptor, IndexedDBError> {
1610 let header = if token.trim().is_empty() {
1611 None
1612 } else {
1613 Some(MetadataValue::try_from(token.to_string()).map_err(|err| {
1614 IndexedDBError::Env(format!("invalid IndexedDB relay token metadata: {err}"))
1615 })?)
1616 };
1617 Ok(RelayTokenInterceptor { header })
1618}
1619
1620#[derive(Clone)]
1621struct RelayTokenInterceptor {
1622 header: Option<MetadataValue<tonic::metadata::Ascii>>,
1623}
1624
1625impl Interceptor for RelayTokenInterceptor {
1626 fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, tonic::Status> {
1627 if let Some(header) = self.header.clone() {
1628 request
1629 .metadata_mut()
1630 .insert(INDEXEDDB_RELAY_TOKEN_HEADER, header);
1631 }
1632 Ok(request)
1633 }
1634}