1use std::any::TypeId;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::ops::Deref;
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use bonsaidb_core::admin::{Admin, ADMIN_DATABASE_NAME};
11use bonsaidb_core::api::{self, Api, ApiName};
12use bonsaidb_core::arc_bytes::serde::Bytes;
13use bonsaidb_core::arc_bytes::OwnedBytes;
14use bonsaidb_core::connection::{
15 AsyncStorageConnection, Database, HasSession, IdentityReference, Session,
16};
17use bonsaidb_core::networking::{
18 AlterUserPermissionGroupMembership, AlterUserRoleMembership, AssumeIdentity, CreateDatabase,
19 CreateUser, DeleteDatabase, DeleteUser, ListAvailableSchemas, ListDatabases, LogOutSession,
20 MessageReceived, Payload, UnregisterSubscriber, CURRENT_PROTOCOL_VERSION,
21};
22use bonsaidb_core::permissions::Permissions;
23use bonsaidb_core::schema::{Nameable, Schema, SchemaName, SchemaSummary, Schematic};
24use bonsaidb_utils::fast_async_lock;
25use flume::Sender;
26use futures::future::BoxFuture;
27use futures::{Future, FutureExt};
28use parking_lot::Mutex;
29#[cfg(not(target_arch = "wasm32"))]
30use tokio::runtime::Handle;
31use url::Url;
32
33pub use self::remote_database::{AsyncRemoteDatabase, AsyncRemoteSubscriber};
34#[cfg(not(target_arch = "wasm32"))]
35pub use self::sync::{BlockingClient, BlockingRemoteDatabase, BlockingRemoteSubscriber};
36use crate::builder::Async;
37use crate::error::Error;
38use crate::{ApiError, Builder};
39
40#[cfg(not(target_arch = "wasm32"))]
41mod quic_worker;
42mod remote_database;
43#[cfg(not(target_arch = "wasm32"))]
44mod sync;
45#[cfg(all(feature = "websockets", not(target_arch = "wasm32")))]
46mod tungstenite_worker;
47#[cfg(all(feature = "websockets", target_arch = "wasm32"))]
48mod wasm_websocket_worker;
49
50#[derive(Debug, Clone, Default)]
51pub struct SubscriberMap(Arc<Mutex<HashMap<u64, flume::Sender<Message>>>>);
52
53impl SubscriberMap {
54 pub fn clear(&self) {
55 let mut data = self.lock();
56 data.clear();
57 }
58}
59
60impl Deref for SubscriberMap {
61 type Target = Mutex<HashMap<u64, flume::Sender<Message>>>;
62
63 fn deref(&self) -> &Self::Target {
64 &self.0
65 }
66}
67
68use bonsaidb_core::circulate::Message;
69
70#[cfg(all(feature = "websockets", not(target_arch = "wasm32")))]
71pub type WebSocketError = tokio_tungstenite::tungstenite::Error;
72
73#[cfg(all(feature = "websockets", target_arch = "wasm32"))]
74pub type WebSocketError = wasm_websocket_worker::WebSocketError;
75
76#[derive(Debug, Clone)]
228pub struct AsyncClient {
229 pub(crate) data: Arc<Data>,
230 session: ClientSession,
231 request_timeout: Duration,
232}
233
234impl Drop for AsyncClient {
235 fn drop(&mut self) {
236 if self.session_is_current() && Arc::strong_count(&self.session.session) == 1 {
237 if let Some(session_id) = self.session.session.id {
238 drop(self.invoke_blocking_api_request(&LogOutSession(session_id)));
240 }
241 }
242 }
243}
244
245impl PartialEq for AsyncClient {
246 fn eq(&self, other: &Self) -> bool {
247 Arc::ptr_eq(&self.data, &other.data)
248 }
249}
250
251#[derive(Debug)]
252pub struct Data {
253 request_sender: Sender<PendingRequest>,
254 effective_permissions: Mutex<Option<Permissions>>,
255 schemas: Mutex<HashMap<TypeId, Arc<Schematic>>>,
256 connection_counter: Arc<AtomicU32>,
257 request_id: AtomicU32,
258 subscribers: SubscriberMap,
259}
260
261impl AsyncClient {
262 pub fn build(url: Url) -> Builder<Async> {
264 Builder::new(url)
265 }
266
267 pub fn new(url: Url) -> Result<Self, Error> {
283 Self::new_from_parts(
284 url,
285 CURRENT_PROTOCOL_VERSION,
286 HashMap::default(),
287 None,
288 None,
289 #[cfg(not(target_arch = "wasm32"))]
290 None,
291 #[cfg(not(target_arch = "wasm32"))]
292 Handle::try_current().ok(),
293 )
294 }
295
296 pub(crate) fn new_from_parts(
312 url: Url,
313 protocol_version: &'static str,
314 mut custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
315 connect_timeout: Option<Duration>,
316 request_timeout: Option<Duration>,
317 #[cfg(not(target_arch = "wasm32"))] certificate: Option<fabruic::Certificate>,
318 #[cfg(not(target_arch = "wasm32"))] tokio: Option<Handle>,
319 ) -> Result<Self, Error> {
320 let subscribers = SubscriberMap::default();
321 let callback_subscribers = subscribers.clone();
322 custom_apis.insert(
323 MessageReceived::name(),
324 Some(Arc::new(ApiCallback::<MessageReceived>::new(
325 move |message: MessageReceived| {
326 let callback_subscribers = callback_subscribers.clone();
327 async move {
328 let mut subscribers = callback_subscribers.lock();
329 if let Some(sender) = subscribers.get(&message.subscriber_id) {
330 if sender
331 .send(bonsaidb_core::circulate::Message {
332 topic: OwnedBytes::from(message.topic.into_vec()),
333 payload: OwnedBytes::from(message.payload.into_vec()),
334 })
335 .is_err()
336 {
337 subscribers.remove(&message.subscriber_id);
338 }
339 }
340 }
341 },
342 ))),
343 );
344 let connection = ConnectionInfo {
346 url,
347 subscribers,
348 connect_timeout: connect_timeout.unwrap_or(Duration::from_secs(60)),
349 request_timeout: request_timeout.unwrap_or(Duration::from_secs(60)),
350 };
351 match connection.url.scheme() {
352 #[cfg(not(target_arch = "wasm32"))]
353 "bonsaidb" => Ok(Self::new_bonsai_client(
354 connection,
355 protocol_version,
356 certificate,
357 custom_apis,
358 tokio,
359 )),
360 #[cfg(feature = "websockets")]
361 "wss" | "ws" => Ok(Self::new_websocket_client(
362 connection,
363 protocol_version,
364 custom_apis,
365 #[cfg(not(target_arch = "wasm32"))]
366 tokio,
367 )),
368 other => Err(Error::InvalidUrl(format!("unsupported scheme {other}"))),
369 }
370 }
371
372 #[cfg(not(target_arch = "wasm32"))]
373 fn new_bonsai_client(
374 server: ConnectionInfo,
375 protocol_version: &'static str,
376 certificate: Option<fabruic::Certificate>,
377 custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
378 tokio: Option<Handle>,
379 ) -> Self {
380 let (request_sender, request_receiver) = flume::unbounded();
381 let connection_counter = Arc::new(AtomicU32::default());
382 let request_timeout = server.request_timeout;
383 let subscribers = server.subscribers.clone();
384
385 sync::spawn_client(
386 quic_worker::reconnecting_client_loop(
387 server,
388 protocol_version,
389 certificate,
390 request_receiver,
391 Arc::new(custom_apis),
392 connection_counter.clone(),
393 ),
394 tokio,
395 );
396
397 Self {
398 data: Arc::new(Data {
399 request_sender,
400 schemas: Mutex::default(),
401 connection_counter,
402 request_id: AtomicU32::default(),
403 effective_permissions: Mutex::default(),
404 subscribers,
405 }),
406 session: ClientSession::default(),
407 request_timeout,
408 }
409 }
410
411 #[cfg(all(feature = "websockets", not(target_arch = "wasm32")))]
412 fn new_websocket_client(
413 server: ConnectionInfo,
414 protocol_version: &'static str,
415 custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
416 tokio: Option<Handle>,
417 ) -> Self {
418 let (request_sender, request_receiver) = flume::unbounded();
419 let connection_counter = Arc::new(AtomicU32::default());
420 let request_timeout = server.request_timeout;
421 let subscribers = server.subscribers.clone();
422
423 sync::spawn_client(
424 tungstenite_worker::reconnecting_client_loop(
425 server,
426 protocol_version,
427 request_receiver,
428 Arc::new(custom_apis),
429 connection_counter.clone(),
430 ),
431 tokio,
432 );
433
434 Self {
435 data: Arc::new(Data {
436 request_sender,
437 schemas: Mutex::default(),
438 request_id: AtomicU32::default(),
439 connection_counter,
440 effective_permissions: Mutex::default(),
441 subscribers,
442 }),
443 session: ClientSession::default(),
444 request_timeout,
445 }
446 }
447
448 #[cfg(all(feature = "websockets", target_arch = "wasm32"))]
449 fn new_websocket_client(
450 server: ConnectionInfo,
451 protocol_version: &'static str,
452 custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
453 ) -> Self {
454 let (request_sender, request_receiver) = flume::unbounded();
455 let connection_counter = Arc::new(AtomicU32::default());
456
457 wasm_websocket_worker::spawn_client(
458 Arc::new(server.url),
459 protocol_version,
460 request_receiver,
461 Arc::new(custom_apis),
462 server.subscribers.clone(),
463 connection_counter.clone(),
464 None,
465 server.connect_timeout,
466 );
467
468 #[cfg(feature = "test-util")]
469 let background_task_running = Arc::new(AtomicBool::new(true));
470
471 Self {
472 data: Arc::new(Data {
473 request_sender,
474 schemas: Mutex::default(),
475 request_id: AtomicU32::default(),
476 connection_counter,
477 effective_permissions: Mutex::default(),
478 subscribers: server.subscribers,
479 #[cfg(feature = "test-util")]
480 background_task_running,
481 }),
482 session: ClientSession::default(),
483 request_timeout: server.request_timeout,
484 }
485 }
486
487 fn send_request_without_confirmation(
488 &self,
489 name: ApiName,
490 bytes: Bytes,
491 ) -> Result<flume::Receiver<Result<Bytes, Error>>, Error> {
492 let (result_sender, result_receiver) = flume::bounded(1);
493 let id = self.data.request_id.fetch_add(1, Ordering::SeqCst);
494 self.data.request_sender.send(PendingRequest {
495 request: Payload {
496 session_id: self.session.session.id,
497 id: Some(id),
498 name,
499 value: Ok(bytes),
500 },
501 responder: result_sender,
502 })?;
503
504 Ok(result_receiver)
505 }
506
507 async fn send_request_async(&self, name: ApiName, bytes: Bytes) -> Result<Bytes, Error> {
508 let result_receiver = self.send_request_without_confirmation(name, bytes)?;
509
510 #[cfg(target_arch = "wasm32")]
511 let result = {
512 use wasm_bindgen::JsCast;
513 let (timeout_sender, mut timeout_receiver) = futures::channel::oneshot::channel();
514 {
516 if let Some(window) = web_sys::window() {
517 let timeout = wasm_bindgen::closure::Closure::once_into_js(move || {
518 let _result = timeout_sender.send(());
519 });
520 let _: Result<_, _> = window
521 .set_timeout_with_callback_and_timeout_and_arguments_0(
522 timeout.as_ref().unchecked_ref(),
523 self.request_timeout
524 .as_millis()
525 .try_into()
526 .unwrap_or(i32::MAX),
527 );
528 }
529 }
530 futures::select! {
531 result = result_receiver.recv_async() => Ok(result),
532 _ = timeout_receiver => Err(Error::Network(bonsaidb_core::networking::Error::RequestTimeout)),
533 }
534 };
535 #[cfg(not(target_arch = "wasm32"))]
536 let result = tokio::time::timeout(self.request_timeout, result_receiver.recv_async()).await;
537
538 match result {
539 Ok(response) => response?,
540 Err(_) => Err(Error::request_timeout()),
541 }
542 }
543
544 #[cfg(not(target_arch = "wasm32"))]
545 fn send_request(&self, name: ApiName, bytes: Bytes) -> Result<Bytes, Error> {
546 let result_receiver = self.send_request_without_confirmation(name, bytes)?;
547
548 result_receiver.recv_timeout(self.request_timeout)?
549 }
550
551 pub async fn send_api_request<Api: api::Api>(
553 &self,
554 request: &Api,
555 ) -> Result<Api::Response, ApiError<Api::Error>> {
556 let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
557 let response = self.send_request_async(Api::name(), request).await?;
558 let response =
559 pot::from_slice::<Result<Api::Response, Api::Error>>(&response).map_err(Error::from)?;
560 response.map_err(ApiError::Api)
561 }
562
563 #[cfg(not(target_arch = "wasm32"))]
564 fn send_blocking_api_request<Api: api::Api>(
565 &self,
566 request: &Api,
567 ) -> Result<Api::Response, ApiError<Api::Error>> {
568 let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
569 let response = self.send_request(Api::name(), request)?;
570
571 let response =
572 pot::from_slice::<Result<Api::Response, Api::Error>>(&response).map_err(Error::from)?;
573 response.map_err(ApiError::Api)
574 }
575
576 fn invoke_blocking_api_request<Api: api::Api>(&self, request: &Api) -> Result<(), Error> {
577 let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
578 self.send_request_without_confirmation(Api::name(), request)
579 .map(|_| ())
580 }
581
582 #[must_use]
585 pub fn effective_permissions(&self) -> Option<Permissions> {
586 let effective_permissions = self.data.effective_permissions.lock();
587 effective_permissions.clone()
588 }
589
590 pub(crate) fn register_subscriber(&self, id: u64, sender: flume::Sender<Message>) {
591 let mut subscribers = self.data.subscribers.lock();
592 subscribers.insert(id, sender);
593 }
594
595 pub(crate) async fn unregister_subscriber_async(&self, database: String, id: u64) {
596 drop(
597 self.send_api_request(&UnregisterSubscriber {
598 database,
599 subscriber_id: id,
600 })
601 .await,
602 );
603 let mut subscribers = self.data.subscribers.lock();
604 subscribers.remove(&id);
605 }
606
607 #[cfg(not(target_arch = "wasm32"))]
608 pub(crate) fn unregister_subscriber(&self, database: String, id: u64) {
609 drop(self.send_blocking_api_request(&UnregisterSubscriber {
610 database,
611 subscriber_id: id,
612 }));
613 let mut subscribers = self.data.subscribers.lock();
614 subscribers.remove(&id);
615 }
616
617 fn remote_database<DB: bonsaidb_core::schema::Schema>(
618 &self,
619 name: &str,
620 ) -> Result<AsyncRemoteDatabase, bonsaidb_core::Error> {
621 let mut schemas = self.data.schemas.lock();
622 let type_id = TypeId::of::<DB>();
623 let schematic = if let Some(schematic) = schemas.get(&type_id) {
624 schematic.clone()
625 } else {
626 let schematic = Arc::new(DB::schematic()?);
627 schemas.insert(type_id, schematic.clone());
628 schematic
629 };
630 Ok(AsyncRemoteDatabase::new(
631 self.clone(),
632 name.to_string(),
633 schematic,
634 ))
635 }
636
637 fn session_is_current(&self) -> bool {
638 self.session.session.id.is_none()
639 || self.data.connection_counter.load(Ordering::SeqCst) == self.session.connection_id
640 }
641
642 pub fn set_request_timeout(&mut self, timeout: impl Into<Duration>) {
647 self.request_timeout = timeout.into();
648 }
649}
650
651impl HasSession for AsyncClient {
652 fn session(&self) -> Option<&Session> {
653 self.session_is_current().then_some(&self.session.session)
654 }
655}
656
657#[async_trait]
658impl AsyncStorageConnection for AsyncClient {
659 type Authenticated = Self;
660 type Database = AsyncRemoteDatabase;
661
662 async fn admin(&self) -> Self::Database {
663 self.remote_database::<Admin>(ADMIN_DATABASE_NAME).unwrap()
664 }
665
666 async fn create_database_with_schema(
667 &self,
668 name: &str,
669 schema: SchemaName,
670 only_if_needed: bool,
671 ) -> Result<(), bonsaidb_core::Error> {
672 self.send_api_request(&CreateDatabase {
673 database: Database {
674 name: name.to_string(),
675 schema,
676 },
677 only_if_needed,
678 })
679 .await?;
680 Ok(())
681 }
682
683 async fn database<DB: Schema>(
684 &self,
685 name: &str,
686 ) -> Result<Self::Database, bonsaidb_core::Error> {
687 self.remote_database::<DB>(name)
688 }
689
690 async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
691 self.send_api_request(&DeleteDatabase {
692 name: name.to_string(),
693 })
694 .await?;
695 Ok(())
696 }
697
698 async fn list_databases(&self) -> Result<Vec<Database>, bonsaidb_core::Error> {
699 Ok(self.send_api_request(&ListDatabases).await?)
700 }
701
702 async fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
703 Ok(self.send_api_request(&ListAvailableSchemas).await?)
704 }
705
706 async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
707 Ok(self
708 .send_api_request(&CreateUser {
709 username: username.to_string(),
710 })
711 .await?)
712 }
713
714 async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
715 &self,
716 user: U,
717 ) -> Result<(), bonsaidb_core::Error> {
718 Ok(self
719 .send_api_request(&DeleteUser {
720 user: user.name()?.into_owned(),
721 })
722 .await?)
723 }
724
725 #[cfg(feature = "password-hashing")]
726 async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
727 &self,
728 user: U,
729 password: bonsaidb_core::connection::SensitiveString,
730 ) -> Result<(), bonsaidb_core::Error> {
731 Ok(self
732 .send_api_request(&bonsaidb_core::networking::SetUserPassword {
733 user: user.name()?.into_owned(),
734 password,
735 })
736 .await?)
737 }
738
739 #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
740 async fn authenticate(
741 &self,
742 authentication: bonsaidb_core::connection::Authentication,
743 ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
744 let session = self
745 .send_api_request(&bonsaidb_core::networking::Authenticate { authentication })
746 .await?;
747 Ok(Self {
748 data: self.data.clone(),
749 session: ClientSession {
750 session: Arc::new(session),
751 connection_id: self.data.connection_counter.load(Ordering::SeqCst),
752 },
753 request_timeout: self.request_timeout,
754 })
755 }
756
757 async fn assume_identity(
758 &self,
759 identity: IdentityReference<'_>,
760 ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
761 let session = self
762 .send_api_request(&AssumeIdentity(identity.into_owned()))
763 .await?;
764 Ok(Self {
765 data: self.data.clone(),
766 session: ClientSession {
767 session: Arc::new(session),
768 connection_id: self.data.connection_counter.load(Ordering::SeqCst),
769 },
770 request_timeout: self.request_timeout,
771 })
772 }
773
774 async fn add_permission_group_to_user<
775 'user,
776 'group,
777 U: Nameable<'user, u64> + Send + Sync,
778 G: Nameable<'group, u64> + Send + Sync,
779 >(
780 &self,
781 user: U,
782 permission_group: G,
783 ) -> Result<(), bonsaidb_core::Error> {
784 self.send_api_request(&AlterUserPermissionGroupMembership {
785 user: user.name()?.into_owned(),
786 group: permission_group.name()?.into_owned(),
787 should_be_member: true,
788 })
789 .await?;
790 Ok(())
791 }
792
793 async fn remove_permission_group_from_user<
794 'user,
795 'group,
796 U: Nameable<'user, u64> + Send + Sync,
797 G: Nameable<'group, u64> + Send + Sync,
798 >(
799 &self,
800 user: U,
801 permission_group: G,
802 ) -> Result<(), bonsaidb_core::Error> {
803 self.send_api_request(&AlterUserPermissionGroupMembership {
804 user: user.name()?.into_owned(),
805 group: permission_group.name()?.into_owned(),
806 should_be_member: false,
807 })
808 .await?;
809 Ok(())
810 }
811
812 async fn add_role_to_user<
813 'user,
814 'group,
815 U: Nameable<'user, u64> + Send + Sync,
816 G: Nameable<'group, u64> + Send + Sync,
817 >(
818 &self,
819 user: U,
820 role: G,
821 ) -> Result<(), bonsaidb_core::Error> {
822 self.send_api_request(&AlterUserRoleMembership {
823 user: user.name()?.into_owned(),
824 role: role.name()?.into_owned(),
825 should_be_member: true,
826 })
827 .await?;
828 Ok(())
829 }
830
831 async fn remove_role_from_user<
832 'user,
833 'group,
834 U: Nameable<'user, u64> + Send + Sync,
835 G: Nameable<'group, u64> + Send + Sync,
836 >(
837 &self,
838 user: U,
839 role: G,
840 ) -> Result<(), bonsaidb_core::Error> {
841 self.send_api_request(&AlterUserRoleMembership {
842 user: user.name()?.into_owned(),
843 role: role.name()?.into_owned(),
844 should_be_member: false,
845 })
846 .await?;
847 Ok(())
848 }
849}
850
851type OutstandingRequestMap = HashMap<u32, PendingRequest>;
852type OutstandingRequestMapHandle = Arc<async_lock::Mutex<OutstandingRequestMap>>;
853type PendingRequestResponder = Sender<Result<Bytes, Error>>;
854
855#[derive(Debug)]
856pub struct PendingRequest {
857 request: Payload,
858 responder: PendingRequestResponder,
859}
860
861async fn process_response_payload(
862 payload: Payload,
863 outstanding_requests: &OutstandingRequestMapHandle,
864 custom_apis: &HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
865) {
866 if let Some(payload_id) = payload.id {
867 if let Some(outstanding_request) = {
868 let mut outstanding_requests = fast_async_lock!(outstanding_requests);
869 outstanding_requests.remove(&payload_id)
870 } {
871 drop(
872 outstanding_request
873 .responder
874 .send(payload.value.map_err(Error::from)),
875 );
876 }
877 } else if let (Some(custom_api_callback), Ok(value)) = (
878 custom_apis.get(&payload.name).and_then(Option::as_ref),
879 payload.value,
880 ) {
881 custom_api_callback.response_received(value).await;
882 } else {
883 log::warn!("unexpected api response received ({})", payload.name);
884 }
885}
886
887trait ApiWrapper<Response>: Send + Sync {
888 fn invoke(&self, response: Response) -> BoxFuture<'static, ()>;
889}
890
891pub struct ApiCallback<Api: api::Api> {
894 generator: Box<dyn ApiWrapper<Api::Response>>,
895}
896
897pub trait ApiCallbackFn<Request, F>: Fn(Request) -> F + Send + Sync + 'static {}
899
900impl<T, Request, F> ApiCallbackFn<Request, F> for T where T: Fn(Request) -> F + Send + Sync + 'static
901{}
902
903struct ApiFutureBoxer<Response: Send + Sync, F: Future<Output = ()> + Send + Sync>(
904 Box<dyn ApiCallbackFn<Response, F>>,
905);
906
907impl<Response: Send + Sync, F: Future<Output = ()> + Send + Sync + 'static> ApiWrapper<Response>
908 for ApiFutureBoxer<Response, F>
909{
910 fn invoke(&self, response: Response) -> BoxFuture<'static, ()> {
911 self.0(response).boxed()
912 }
913}
914
915impl<Api: api::Api> ApiCallback<Api> {
916 pub fn new<
918 F: ApiCallbackFn<Api::Response, Fut>,
919 Fut: Future<Output = ()> + Send + Sync + 'static,
920 >(
921 callback: F,
922 ) -> Self {
923 Self {
924 generator: Box::new(ApiFutureBoxer::<Api::Response, Fut>(Box::new(callback))),
925 }
926 }
927
928 pub fn new_with_context<
933 Context: Send + Sync + Clone + 'static,
934 F: Fn(Api::Response, Context) -> Fut + Send + Sync + 'static,
935 Fut: Future<Output = ()> + Send + Sync + 'static,
936 >(
937 context: Context,
938 callback: F,
939 ) -> Self {
940 Self {
941 generator: Box::new(ApiFutureBoxer::<Api::Response, Fut>(Box::new(
942 move |request| {
943 let context = context.clone();
944 callback(request, context)
945 },
946 ))),
947 }
948 }
949}
950
951#[async_trait]
952pub trait AnyApiCallback: Send + Sync + 'static {
953 async fn response_received(&self, response: Bytes);
956}
957
958#[async_trait]
959impl<Api: api::Api> AnyApiCallback for ApiCallback<Api> {
960 async fn response_received(&self, response: Bytes) {
961 match pot::from_slice::<Result<Api::Response, Api::Error>>(&response) {
962 Ok(response) => self.generator.invoke(response.unwrap()).await,
963 Err(err) => {
964 log::error!("error deserializing api: {err}");
965 }
966 }
967 }
968}
969
970#[derive(Debug, Clone, Default)]
971pub struct ClientSession {
972 session: Arc<Session>,
973 connection_id: u32,
974}
975
976async fn disconnect_pending_requests(
977 outstanding_requests: &OutstandingRequestMapHandle,
978 pending_error: &mut Option<Error>,
979) {
980 let mut outstanding_requests = fast_async_lock!(outstanding_requests);
981 for (_, pending) in outstanding_requests.drain() {
982 drop(
983 pending
984 .responder
985 .send(Err(pending_error.take().unwrap_or(Error::disconnected()))),
986 );
987 }
988}
989
990struct ConnectionInfo {
991 pub url: Url,
992 pub subscribers: SubscriberMap,
993 #[cfg_attr(target_arch = "wasm32", allow(dead_code))]
994 pub connect_timeout: Duration,
995 pub request_timeout: Duration,
996}