1use std::any::TypeId;
2use std::collections::HashMap;
3use std::fmt::Debug;
4use std::ops::Deref;
5use std::sync::atomic::{AtomicU32, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use bonsaidb_core::admin::{Admin, ADMIN_DATABASE_NAME};
11use bonsaidb_core::api::{self, Api, ApiName};
12use bonsaidb_core::arc_bytes::serde::Bytes;
13use bonsaidb_core::arc_bytes::OwnedBytes;
14use bonsaidb_core::connection::{
15    AsyncStorageConnection, Database, HasSession, IdentityReference, Session,
16};
17use bonsaidb_core::networking::{
18    AlterUserPermissionGroupMembership, AlterUserRoleMembership, AssumeIdentity, CreateDatabase,
19    CreateUser, DeleteDatabase, DeleteUser, ListAvailableSchemas, ListDatabases, LogOutSession,
20    MessageReceived, Payload, UnregisterSubscriber, CURRENT_PROTOCOL_VERSION,
21};
22use bonsaidb_core::permissions::Permissions;
23use bonsaidb_core::schema::{Nameable, Schema, SchemaName, SchemaSummary, Schematic};
24use bonsaidb_utils::fast_async_lock;
25use flume::Sender;
26use futures::future::BoxFuture;
27use futures::{Future, FutureExt};
28use parking_lot::Mutex;
29#[cfg(not(target_arch = "wasm32"))]
30use tokio::runtime::Handle;
31use url::Url;
32
33pub use self::remote_database::{AsyncRemoteDatabase, AsyncRemoteSubscriber};
34#[cfg(not(target_arch = "wasm32"))]
35pub use self::sync::{BlockingClient, BlockingRemoteDatabase, BlockingRemoteSubscriber};
36use crate::builder::Async;
37use crate::error::Error;
38use crate::{ApiError, Builder};
39
40#[cfg(not(target_arch = "wasm32"))]
41mod quic_worker;
42mod remote_database;
43#[cfg(not(target_arch = "wasm32"))]
44mod sync;
45#[cfg(all(feature = "websockets", not(target_arch = "wasm32")))]
46mod tungstenite_worker;
47#[cfg(all(feature = "websockets", target_arch = "wasm32"))]
48mod wasm_websocket_worker;
49
50#[derive(Debug, Clone, Default)]
51pub struct SubscriberMap(Arc<Mutex<HashMap<u64, flume::Sender<Message>>>>);
52
53impl SubscriberMap {
54    pub fn clear(&self) {
55        let mut data = self.lock();
56        data.clear();
57    }
58}
59
60impl Deref for SubscriberMap {
61    type Target = Mutex<HashMap<u64, flume::Sender<Message>>>;
62
63    fn deref(&self) -> &Self::Target {
64        &self.0
65    }
66}
67
68use bonsaidb_core::circulate::Message;
69
70#[cfg(all(feature = "websockets", not(target_arch = "wasm32")))]
71pub type WebSocketError = tokio_tungstenite::tungstenite::Error;
72
73#[cfg(all(feature = "websockets", target_arch = "wasm32"))]
74pub type WebSocketError = wasm_websocket_worker::WebSocketError;
75
76#[derive(Debug, Clone)]
228pub struct AsyncClient {
229    pub(crate) data: Arc<Data>,
230    session: ClientSession,
231    request_timeout: Duration,
232}
233
234impl Drop for AsyncClient {
235    fn drop(&mut self) {
236        if self.session_is_current() && Arc::strong_count(&self.session.session) == 1 {
237            if let Some(session_id) = self.session.session.id {
238                drop(self.invoke_blocking_api_request(&LogOutSession(session_id)));
240            }
241        }
242    }
243}
244
245impl PartialEq for AsyncClient {
246    fn eq(&self, other: &Self) -> bool {
247        Arc::ptr_eq(&self.data, &other.data)
248    }
249}
250
251#[derive(Debug)]
252pub struct Data {
253    request_sender: Sender<PendingRequest>,
254    effective_permissions: Mutex<Option<Permissions>>,
255    schemas: Mutex<HashMap<TypeId, Arc<Schematic>>>,
256    connection_counter: Arc<AtomicU32>,
257    request_id: AtomicU32,
258    subscribers: SubscriberMap,
259}
260
261impl AsyncClient {
262    pub fn build(url: Url) -> Builder<Async> {
264        Builder::new(url)
265    }
266
267    pub fn new(url: Url) -> Result<Self, Error> {
283        Self::new_from_parts(
284            url,
285            CURRENT_PROTOCOL_VERSION,
286            HashMap::default(),
287            None,
288            None,
289            #[cfg(not(target_arch = "wasm32"))]
290            None,
291            #[cfg(not(target_arch = "wasm32"))]
292            Handle::try_current().ok(),
293        )
294    }
295
296    pub(crate) fn new_from_parts(
312        url: Url,
313        protocol_version: &'static str,
314        mut custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
315        connect_timeout: Option<Duration>,
316        request_timeout: Option<Duration>,
317        #[cfg(not(target_arch = "wasm32"))] certificate: Option<fabruic::Certificate>,
318        #[cfg(not(target_arch = "wasm32"))] tokio: Option<Handle>,
319    ) -> Result<Self, Error> {
320        let subscribers = SubscriberMap::default();
321        let callback_subscribers = subscribers.clone();
322        custom_apis.insert(
323            MessageReceived::name(),
324            Some(Arc::new(ApiCallback::<MessageReceived>::new(
325                move |message: MessageReceived| {
326                    let callback_subscribers = callback_subscribers.clone();
327                    async move {
328                        let mut subscribers = callback_subscribers.lock();
329                        if let Some(sender) = subscribers.get(&message.subscriber_id) {
330                            if sender
331                                .send(bonsaidb_core::circulate::Message {
332                                    topic: OwnedBytes::from(message.topic.into_vec()),
333                                    payload: OwnedBytes::from(message.payload.into_vec()),
334                                })
335                                .is_err()
336                            {
337                                subscribers.remove(&message.subscriber_id);
338                            }
339                        }
340                    }
341                },
342            ))),
343        );
344        let connection = ConnectionInfo {
346            url,
347            subscribers,
348            connect_timeout: connect_timeout.unwrap_or(Duration::from_secs(60)),
349            request_timeout: request_timeout.unwrap_or(Duration::from_secs(60)),
350        };
351        match connection.url.scheme() {
352            #[cfg(not(target_arch = "wasm32"))]
353            "bonsaidb" => Ok(Self::new_bonsai_client(
354                connection,
355                protocol_version,
356                certificate,
357                custom_apis,
358                tokio,
359            )),
360            #[cfg(feature = "websockets")]
361            "wss" | "ws" => Ok(Self::new_websocket_client(
362                connection,
363                protocol_version,
364                custom_apis,
365                #[cfg(not(target_arch = "wasm32"))]
366                tokio,
367            )),
368            other => Err(Error::InvalidUrl(format!("unsupported scheme {other}"))),
369        }
370    }
371
372    #[cfg(not(target_arch = "wasm32"))]
373    fn new_bonsai_client(
374        server: ConnectionInfo,
375        protocol_version: &'static str,
376        certificate: Option<fabruic::Certificate>,
377        custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
378        tokio: Option<Handle>,
379    ) -> Self {
380        let (request_sender, request_receiver) = flume::unbounded();
381        let connection_counter = Arc::new(AtomicU32::default());
382        let request_timeout = server.request_timeout;
383        let subscribers = server.subscribers.clone();
384
385        sync::spawn_client(
386            quic_worker::reconnecting_client_loop(
387                server,
388                protocol_version,
389                certificate,
390                request_receiver,
391                Arc::new(custom_apis),
392                connection_counter.clone(),
393            ),
394            tokio,
395        );
396
397        Self {
398            data: Arc::new(Data {
399                request_sender,
400                schemas: Mutex::default(),
401                connection_counter,
402                request_id: AtomicU32::default(),
403                effective_permissions: Mutex::default(),
404                subscribers,
405            }),
406            session: ClientSession::default(),
407            request_timeout,
408        }
409    }
410
411    #[cfg(all(feature = "websockets", not(target_arch = "wasm32")))]
412    fn new_websocket_client(
413        server: ConnectionInfo,
414        protocol_version: &'static str,
415        custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
416        tokio: Option<Handle>,
417    ) -> Self {
418        let (request_sender, request_receiver) = flume::unbounded();
419        let connection_counter = Arc::new(AtomicU32::default());
420        let request_timeout = server.request_timeout;
421        let subscribers = server.subscribers.clone();
422
423        sync::spawn_client(
424            tungstenite_worker::reconnecting_client_loop(
425                server,
426                protocol_version,
427                request_receiver,
428                Arc::new(custom_apis),
429                connection_counter.clone(),
430            ),
431            tokio,
432        );
433
434        Self {
435            data: Arc::new(Data {
436                request_sender,
437                schemas: Mutex::default(),
438                request_id: AtomicU32::default(),
439                connection_counter,
440                effective_permissions: Mutex::default(),
441                subscribers,
442            }),
443            session: ClientSession::default(),
444            request_timeout,
445        }
446    }
447
448    #[cfg(all(feature = "websockets", target_arch = "wasm32"))]
449    fn new_websocket_client(
450        server: ConnectionInfo,
451        protocol_version: &'static str,
452        custom_apis: HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
453    ) -> Self {
454        let (request_sender, request_receiver) = flume::unbounded();
455        let connection_counter = Arc::new(AtomicU32::default());
456
457        wasm_websocket_worker::spawn_client(
458            Arc::new(server.url),
459            protocol_version,
460            request_receiver,
461            Arc::new(custom_apis),
462            server.subscribers.clone(),
463            connection_counter.clone(),
464            None,
465            server.connect_timeout,
466        );
467
468        #[cfg(feature = "test-util")]
469        let background_task_running = Arc::new(AtomicBool::new(true));
470
471        Self {
472            data: Arc::new(Data {
473                request_sender,
474                schemas: Mutex::default(),
475                request_id: AtomicU32::default(),
476                connection_counter,
477                effective_permissions: Mutex::default(),
478                subscribers: server.subscribers,
479                #[cfg(feature = "test-util")]
480                background_task_running,
481            }),
482            session: ClientSession::default(),
483            request_timeout: server.request_timeout,
484        }
485    }
486
487    fn send_request_without_confirmation(
488        &self,
489        name: ApiName,
490        bytes: Bytes,
491    ) -> Result<flume::Receiver<Result<Bytes, Error>>, Error> {
492        let (result_sender, result_receiver) = flume::bounded(1);
493        let id = self.data.request_id.fetch_add(1, Ordering::SeqCst);
494        self.data.request_sender.send(PendingRequest {
495            request: Payload {
496                session_id: self.session.session.id,
497                id: Some(id),
498                name,
499                value: Ok(bytes),
500            },
501            responder: result_sender,
502        })?;
503
504        Ok(result_receiver)
505    }
506
507    async fn send_request_async(&self, name: ApiName, bytes: Bytes) -> Result<Bytes, Error> {
508        let result_receiver = self.send_request_without_confirmation(name, bytes)?;
509
510        #[cfg(target_arch = "wasm32")]
511        let result = {
512            use wasm_bindgen::JsCast;
513            let (timeout_sender, mut timeout_receiver) = futures::channel::oneshot::channel();
514            {
516                if let Some(window) = web_sys::window() {
517                    let timeout = wasm_bindgen::closure::Closure::once_into_js(move || {
518                        let _result = timeout_sender.send(());
519                    });
520                    let _: Result<_, _> = window
521                        .set_timeout_with_callback_and_timeout_and_arguments_0(
522                            timeout.as_ref().unchecked_ref(),
523                            self.request_timeout
524                                .as_millis()
525                                .try_into()
526                                .unwrap_or(i32::MAX),
527                        );
528                }
529            }
530            futures::select! {
531                result = result_receiver.recv_async() => Ok(result),
532                _ = timeout_receiver => Err(Error::Network(bonsaidb_core::networking::Error::RequestTimeout)),
533            }
534        };
535        #[cfg(not(target_arch = "wasm32"))]
536        let result = tokio::time::timeout(self.request_timeout, result_receiver.recv_async()).await;
537
538        match result {
539            Ok(response) => response?,
540            Err(_) => Err(Error::request_timeout()),
541        }
542    }
543
544    #[cfg(not(target_arch = "wasm32"))]
545    fn send_request(&self, name: ApiName, bytes: Bytes) -> Result<Bytes, Error> {
546        let result_receiver = self.send_request_without_confirmation(name, bytes)?;
547
548        result_receiver.recv_timeout(self.request_timeout)?
549    }
550
551    pub async fn send_api_request<Api: api::Api>(
553        &self,
554        request: &Api,
555    ) -> Result<Api::Response, ApiError<Api::Error>> {
556        let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
557        let response = self.send_request_async(Api::name(), request).await?;
558        let response =
559            pot::from_slice::<Result<Api::Response, Api::Error>>(&response).map_err(Error::from)?;
560        response.map_err(ApiError::Api)
561    }
562
563    #[cfg(not(target_arch = "wasm32"))]
564    fn send_blocking_api_request<Api: api::Api>(
565        &self,
566        request: &Api,
567    ) -> Result<Api::Response, ApiError<Api::Error>> {
568        let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
569        let response = self.send_request(Api::name(), request)?;
570
571        let response =
572            pot::from_slice::<Result<Api::Response, Api::Error>>(&response).map_err(Error::from)?;
573        response.map_err(ApiError::Api)
574    }
575
576    fn invoke_blocking_api_request<Api: api::Api>(&self, request: &Api) -> Result<(), Error> {
577        let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
578        self.send_request_without_confirmation(Api::name(), request)
579            .map(|_| ())
580    }
581
582    #[must_use]
585    pub fn effective_permissions(&self) -> Option<Permissions> {
586        let effective_permissions = self.data.effective_permissions.lock();
587        effective_permissions.clone()
588    }
589
590    pub(crate) fn register_subscriber(&self, id: u64, sender: flume::Sender<Message>) {
591        let mut subscribers = self.data.subscribers.lock();
592        subscribers.insert(id, sender);
593    }
594
595    pub(crate) async fn unregister_subscriber_async(&self, database: String, id: u64) {
596        drop(
597            self.send_api_request(&UnregisterSubscriber {
598                database,
599                subscriber_id: id,
600            })
601            .await,
602        );
603        let mut subscribers = self.data.subscribers.lock();
604        subscribers.remove(&id);
605    }
606
607    #[cfg(not(target_arch = "wasm32"))]
608    pub(crate) fn unregister_subscriber(&self, database: String, id: u64) {
609        drop(self.send_blocking_api_request(&UnregisterSubscriber {
610            database,
611            subscriber_id: id,
612        }));
613        let mut subscribers = self.data.subscribers.lock();
614        subscribers.remove(&id);
615    }
616
617    fn remote_database<DB: bonsaidb_core::schema::Schema>(
618        &self,
619        name: &str,
620    ) -> Result<AsyncRemoteDatabase, bonsaidb_core::Error> {
621        let mut schemas = self.data.schemas.lock();
622        let type_id = TypeId::of::<DB>();
623        let schematic = if let Some(schematic) = schemas.get(&type_id) {
624            schematic.clone()
625        } else {
626            let schematic = Arc::new(DB::schematic()?);
627            schemas.insert(type_id, schematic.clone());
628            schematic
629        };
630        Ok(AsyncRemoteDatabase::new(
631            self.clone(),
632            name.to_string(),
633            schematic,
634        ))
635    }
636
637    fn session_is_current(&self) -> bool {
638        self.session.session.id.is_none()
639            || self.data.connection_counter.load(Ordering::SeqCst) == self.session.connection_id
640    }
641
642    pub fn set_request_timeout(&mut self, timeout: impl Into<Duration>) {
647        self.request_timeout = timeout.into();
648    }
649}
650
651impl HasSession for AsyncClient {
652    fn session(&self) -> Option<&Session> {
653        self.session_is_current().then_some(&self.session.session)
654    }
655}
656
657#[async_trait]
658impl AsyncStorageConnection for AsyncClient {
659    type Authenticated = Self;
660    type Database = AsyncRemoteDatabase;
661
662    async fn admin(&self) -> Self::Database {
663        self.remote_database::<Admin>(ADMIN_DATABASE_NAME).unwrap()
664    }
665
666    async fn create_database_with_schema(
667        &self,
668        name: &str,
669        schema: SchemaName,
670        only_if_needed: bool,
671    ) -> Result<(), bonsaidb_core::Error> {
672        self.send_api_request(&CreateDatabase {
673            database: Database {
674                name: name.to_string(),
675                schema,
676            },
677            only_if_needed,
678        })
679        .await?;
680        Ok(())
681    }
682
683    async fn database<DB: Schema>(
684        &self,
685        name: &str,
686    ) -> Result<Self::Database, bonsaidb_core::Error> {
687        self.remote_database::<DB>(name)
688    }
689
690    async fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
691        self.send_api_request(&DeleteDatabase {
692            name: name.to_string(),
693        })
694        .await?;
695        Ok(())
696    }
697
698    async fn list_databases(&self) -> Result<Vec<Database>, bonsaidb_core::Error> {
699        Ok(self.send_api_request(&ListDatabases).await?)
700    }
701
702    async fn list_available_schemas(&self) -> Result<Vec<SchemaSummary>, bonsaidb_core::Error> {
703        Ok(self.send_api_request(&ListAvailableSchemas).await?)
704    }
705
706    async fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
707        Ok(self
708            .send_api_request(&CreateUser {
709                username: username.to_string(),
710            })
711            .await?)
712    }
713
714    async fn delete_user<'user, U: Nameable<'user, u64> + Send + Sync>(
715        &self,
716        user: U,
717    ) -> Result<(), bonsaidb_core::Error> {
718        Ok(self
719            .send_api_request(&DeleteUser {
720                user: user.name()?.into_owned(),
721            })
722            .await?)
723    }
724
725    #[cfg(feature = "password-hashing")]
726    async fn set_user_password<'user, U: Nameable<'user, u64> + Send + Sync>(
727        &self,
728        user: U,
729        password: bonsaidb_core::connection::SensitiveString,
730    ) -> Result<(), bonsaidb_core::Error> {
731        Ok(self
732            .send_api_request(&bonsaidb_core::networking::SetUserPassword {
733                user: user.name()?.into_owned(),
734                password,
735            })
736            .await?)
737    }
738
739    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
740    async fn authenticate(
741        &self,
742        authentication: bonsaidb_core::connection::Authentication,
743    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
744        let session = self
745            .send_api_request(&bonsaidb_core::networking::Authenticate { authentication })
746            .await?;
747        Ok(Self {
748            data: self.data.clone(),
749            session: ClientSession {
750                session: Arc::new(session),
751                connection_id: self.data.connection_counter.load(Ordering::SeqCst),
752            },
753            request_timeout: self.request_timeout,
754        })
755    }
756
757    async fn assume_identity(
758        &self,
759        identity: IdentityReference<'_>,
760    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
761        let session = self
762            .send_api_request(&AssumeIdentity(identity.into_owned()))
763            .await?;
764        Ok(Self {
765            data: self.data.clone(),
766            session: ClientSession {
767                session: Arc::new(session),
768                connection_id: self.data.connection_counter.load(Ordering::SeqCst),
769            },
770            request_timeout: self.request_timeout,
771        })
772    }
773
774    async fn add_permission_group_to_user<
775        'user,
776        'group,
777        U: Nameable<'user, u64> + Send + Sync,
778        G: Nameable<'group, u64> + Send + Sync,
779    >(
780        &self,
781        user: U,
782        permission_group: G,
783    ) -> Result<(), bonsaidb_core::Error> {
784        self.send_api_request(&AlterUserPermissionGroupMembership {
785            user: user.name()?.into_owned(),
786            group: permission_group.name()?.into_owned(),
787            should_be_member: true,
788        })
789        .await?;
790        Ok(())
791    }
792
793    async fn remove_permission_group_from_user<
794        'user,
795        'group,
796        U: Nameable<'user, u64> + Send + Sync,
797        G: Nameable<'group, u64> + Send + Sync,
798    >(
799        &self,
800        user: U,
801        permission_group: G,
802    ) -> Result<(), bonsaidb_core::Error> {
803        self.send_api_request(&AlterUserPermissionGroupMembership {
804            user: user.name()?.into_owned(),
805            group: permission_group.name()?.into_owned(),
806            should_be_member: false,
807        })
808        .await?;
809        Ok(())
810    }
811
812    async fn add_role_to_user<
813        'user,
814        'group,
815        U: Nameable<'user, u64> + Send + Sync,
816        G: Nameable<'group, u64> + Send + Sync,
817    >(
818        &self,
819        user: U,
820        role: G,
821    ) -> Result<(), bonsaidb_core::Error> {
822        self.send_api_request(&AlterUserRoleMembership {
823            user: user.name()?.into_owned(),
824            role: role.name()?.into_owned(),
825            should_be_member: true,
826        })
827        .await?;
828        Ok(())
829    }
830
831    async fn remove_role_from_user<
832        'user,
833        'group,
834        U: Nameable<'user, u64> + Send + Sync,
835        G: Nameable<'group, u64> + Send + Sync,
836    >(
837        &self,
838        user: U,
839        role: G,
840    ) -> Result<(), bonsaidb_core::Error> {
841        self.send_api_request(&AlterUserRoleMembership {
842            user: user.name()?.into_owned(),
843            role: role.name()?.into_owned(),
844            should_be_member: false,
845        })
846        .await?;
847        Ok(())
848    }
849}
850
851type OutstandingRequestMap = HashMap<u32, PendingRequest>;
852type OutstandingRequestMapHandle = Arc<async_lock::Mutex<OutstandingRequestMap>>;
853type PendingRequestResponder = Sender<Result<Bytes, Error>>;
854
855#[derive(Debug)]
856pub struct PendingRequest {
857    request: Payload,
858    responder: PendingRequestResponder,
859}
860
861async fn process_response_payload(
862    payload: Payload,
863    outstanding_requests: &OutstandingRequestMapHandle,
864    custom_apis: &HashMap<ApiName, Option<Arc<dyn AnyApiCallback>>>,
865) {
866    if let Some(payload_id) = payload.id {
867        if let Some(outstanding_request) = {
868            let mut outstanding_requests = fast_async_lock!(outstanding_requests);
869            outstanding_requests.remove(&payload_id)
870        } {
871            drop(
872                outstanding_request
873                    .responder
874                    .send(payload.value.map_err(Error::from)),
875            );
876        }
877    } else if let (Some(custom_api_callback), Ok(value)) = (
878        custom_apis.get(&payload.name).and_then(Option::as_ref),
879        payload.value,
880    ) {
881        custom_api_callback.response_received(value).await;
882    } else {
883        log::warn!("unexpected api response received ({})", payload.name);
884    }
885}
886
887trait ApiWrapper<Response>: Send + Sync {
888    fn invoke(&self, response: Response) -> BoxFuture<'static, ()>;
889}
890
891pub struct ApiCallback<Api: api::Api> {
894    generator: Box<dyn ApiWrapper<Api::Response>>,
895}
896
897pub trait ApiCallbackFn<Request, F>: Fn(Request) -> F + Send + Sync + 'static {}
899
900impl<T, Request, F> ApiCallbackFn<Request, F> for T where T: Fn(Request) -> F + Send + Sync + 'static
901{}
902
903struct ApiFutureBoxer<Response: Send + Sync, F: Future<Output = ()> + Send + Sync>(
904    Box<dyn ApiCallbackFn<Response, F>>,
905);
906
907impl<Response: Send + Sync, F: Future<Output = ()> + Send + Sync + 'static> ApiWrapper<Response>
908    for ApiFutureBoxer<Response, F>
909{
910    fn invoke(&self, response: Response) -> BoxFuture<'static, ()> {
911        self.0(response).boxed()
912    }
913}
914
915impl<Api: api::Api> ApiCallback<Api> {
916    pub fn new<
918        F: ApiCallbackFn<Api::Response, Fut>,
919        Fut: Future<Output = ()> + Send + Sync + 'static,
920    >(
921        callback: F,
922    ) -> Self {
923        Self {
924            generator: Box::new(ApiFutureBoxer::<Api::Response, Fut>(Box::new(callback))),
925        }
926    }
927
928    pub fn new_with_context<
933        Context: Send + Sync + Clone + 'static,
934        F: Fn(Api::Response, Context) -> Fut + Send + Sync + 'static,
935        Fut: Future<Output = ()> + Send + Sync + 'static,
936    >(
937        context: Context,
938        callback: F,
939    ) -> Self {
940        Self {
941            generator: Box::new(ApiFutureBoxer::<Api::Response, Fut>(Box::new(
942                move |request| {
943                    let context = context.clone();
944                    callback(request, context)
945                },
946            ))),
947        }
948    }
949}
950
951#[async_trait]
952pub trait AnyApiCallback: Send + Sync + 'static {
953    async fn response_received(&self, response: Bytes);
956}
957
958#[async_trait]
959impl<Api: api::Api> AnyApiCallback for ApiCallback<Api> {
960    async fn response_received(&self, response: Bytes) {
961        match pot::from_slice::<Result<Api::Response, Api::Error>>(&response) {
962            Ok(response) => self.generator.invoke(response.unwrap()).await,
963            Err(err) => {
964                log::error!("error deserializing api: {err}");
965            }
966        }
967    }
968}
969
970#[derive(Debug, Clone, Default)]
971pub struct ClientSession {
972    session: Arc<Session>,
973    connection_id: u32,
974}
975
976async fn disconnect_pending_requests(
977    outstanding_requests: &OutstandingRequestMapHandle,
978    pending_error: &mut Option<Error>,
979) {
980    let mut outstanding_requests = fast_async_lock!(outstanding_requests);
981    for (_, pending) in outstanding_requests.drain() {
982        drop(
983            pending
984                .responder
985                .send(Err(pending_error.take().unwrap_or(Error::disconnected()))),
986        );
987    }
988}
989
990struct ConnectionInfo {
991    pub url: Url,
992    pub subscribers: SubscriberMap,
993    #[cfg_attr(target_arch = "wasm32", allow(dead_code))]
994    pub connect_timeout: Duration,
995    pub request_timeout: Duration,
996}