bonsaidb_client/client/
sync.rs

1use std::collections::HashMap;
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4use std::time::Duration;
5
6use bonsaidb_core::admin::{Admin, ADMIN_DATABASE_NAME};
7use bonsaidb_core::api;
8use bonsaidb_core::arc_bytes::serde::Bytes;
9use bonsaidb_core::connection::{
10    AccessPolicy, Connection, Database, HasSchema, HasSession, IdentityReference,
11    LowLevelConnection, Range, SerializedQueryKey, Sort, StorageConnection,
12};
13use bonsaidb_core::document::{DocumentId, Header, OwnedDocument};
14use bonsaidb_core::keyvalue::KeyValue;
15use bonsaidb_core::networking::{
16    AlterUserPermissionGroupMembership, AlterUserRoleMembership, ApplyTransaction, AssumeIdentity,
17    Compact, CompactCollection, CompactKeyValueStore, Count, CreateDatabase, CreateSubscriber,
18    CreateUser, DeleteDatabase, DeleteDocs, DeleteUser, ExecuteKeyOperation, Get, GetMultiple,
19    LastTransactionId, List, ListAvailableSchemas, ListDatabases, ListExecutedTransactions,
20    ListHeaders, Publish, PublishToAll, Query, QueryWithDocs, Reduce, ReduceGrouped, SubscribeTo,
21    UnsubscribeFrom, CURRENT_PROTOCOL_VERSION,
22};
23use bonsaidb_core::pubsub::{AsyncSubscriber, PubSub, Receiver, Subscriber};
24use bonsaidb_core::schema::view::map;
25use bonsaidb_core::schema::{CollectionName, ViewName};
26use futures::Future;
27use tokio::runtime::{Handle, Runtime};
28use tokio::sync::oneshot;
29use tokio::task::JoinHandle;
30use url::Url;
31
32use crate::builder::Blocking;
33use crate::client::ClientSession;
34use crate::{ApiError, AsyncClient, AsyncRemoteDatabase, AsyncRemoteSubscriber, Builder, Error};
35
36/// A BonsaiDb client that blocks the current thread when performing requests.
37#[derive(Debug, Clone)]
38pub struct BlockingClient(pub(crate) AsyncClient);
39
40impl BlockingClient {
41    /// Returns a builder for a new client connecting to `url`.
42    pub fn build(url: Url) -> Builder<Blocking> {
43        Builder::new(url)
44    }
45
46    /// Initialize a client connecting to `url`. This client can be shared by
47    /// cloning it. All requests are done asynchronously over the same
48    /// connection.
49    ///
50    /// If the client has an error connecting, the first request made will
51    /// present that error. If the client disconnects while processing requests,
52    /// all requests being processed will exit and return
53    /// [`Error::Disconnected`](bonsaidb_core::networking::Error::Disconnected).
54    /// The client will automatically try reconnecting.
55    ///
56    /// The goal of this design of this reconnection strategy is to make it
57    /// easier to build resilliant apps. By allowing existing Client instances
58    /// to recover and reconnect, each component of the apps built can adopt a
59    /// "retry-to-recover" design, or "abort-and-fail" depending on how critical
60    /// the database is to operation.
61    pub fn new(url: Url) -> Result<Self, Error> {
62        AsyncClient::new_from_parts(
63            url,
64            CURRENT_PROTOCOL_VERSION,
65            HashMap::default(),
66            None,
67            None,
68            #[cfg(not(target_arch = "wasm32"))]
69            None,
70            #[cfg(not(target_arch = "wasm32"))]
71            Handle::try_current().ok(),
72        )
73        .map(Self)
74    }
75
76    /// Sends an api `request`.
77    pub fn send_api_request<Api: api::Api>(
78        &self,
79        request: &Api,
80    ) -> Result<Api::Response, ApiError<Api::Error>> {
81        self.0.send_blocking_api_request(request)
82    }
83
84    /// Sends an api `request` without waiting for a result. The response from
85    /// the server will be ignored.
86    pub fn invoke_api_request<Api: api::Api>(&self, request: &Api) -> Result<(), Error> {
87        let request = Bytes::from(pot::to_vec(request).map_err(Error::from)?);
88        self.0
89            .send_request_without_confirmation(Api::name(), request)
90            .map(|_| ())
91    }
92
93    /// Returns a reference to an async-compatible version of this client.
94    #[must_use]
95    pub fn as_async(&self) -> &AsyncClient {
96        &self.0
97    }
98
99    /// Sets this instance's request timeout.
100    ///
101    /// Each client has its own timeout. When cloning a client, this timeout
102    /// setting will be copied to the clone.
103    pub fn set_request_timeout(&mut self, timeout: impl Into<Duration>) {
104        self.0.request_timeout = timeout.into();
105    }
106}
107
108impl From<AsyncClient> for BlockingClient {
109    fn from(client: AsyncClient) -> Self {
110        Self(client)
111    }
112}
113
114impl From<BlockingClient> for AsyncClient {
115    fn from(client: BlockingClient) -> Self {
116        client.0
117    }
118}
119
120impl StorageConnection for BlockingClient {
121    type Authenticated = Self;
122    type Database = BlockingRemoteDatabase;
123
124    fn admin(&self) -> Self::Database {
125        BlockingRemoteDatabase(
126            self.0
127                .remote_database::<Admin>(ADMIN_DATABASE_NAME)
128                .unwrap(),
129        )
130    }
131
132    fn database<DB: bonsaidb_core::schema::Schema>(
133        &self,
134        name: &str,
135    ) -> Result<Self::Database, bonsaidb_core::Error> {
136        self.0
137            .remote_database::<DB>(name)
138            .map(BlockingRemoteDatabase)
139    }
140
141    fn create_database_with_schema(
142        &self,
143        name: &str,
144        schema: bonsaidb_core::schema::SchemaName,
145        only_if_needed: bool,
146    ) -> Result<(), bonsaidb_core::Error> {
147        self.send_api_request(&CreateDatabase {
148            database: Database {
149                name: name.to_string(),
150                schema,
151            },
152            only_if_needed,
153        })?;
154        Ok(())
155    }
156
157    fn delete_database(&self, name: &str) -> Result<(), bonsaidb_core::Error> {
158        self.send_api_request(&DeleteDatabase {
159            name: name.to_string(),
160        })?;
161        Ok(())
162    }
163
164    fn list_databases(
165        &self,
166    ) -> Result<Vec<bonsaidb_core::connection::Database>, bonsaidb_core::Error> {
167        Ok(self.send_api_request(&ListDatabases)?)
168    }
169
170    fn list_available_schemas(
171        &self,
172    ) -> Result<Vec<bonsaidb_core::schema::SchemaSummary>, bonsaidb_core::Error> {
173        Ok(self.send_api_request(&ListAvailableSchemas)?)
174    }
175
176    fn create_user(&self, username: &str) -> Result<u64, bonsaidb_core::Error> {
177        Ok(self.send_api_request(&CreateUser {
178            username: username.to_string(),
179        })?)
180    }
181
182    fn delete_user<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
183        &self,
184        user: U,
185    ) -> Result<(), bonsaidb_core::Error> {
186        Ok(self.send_api_request(&DeleteUser {
187            user: user.name()?.into_owned(),
188        })?)
189    }
190
191    #[cfg(feature = "password-hashing")]
192    fn set_user_password<'user, U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync>(
193        &self,
194        user: U,
195        password: bonsaidb_core::connection::SensitiveString,
196    ) -> Result<(), bonsaidb_core::Error> {
197        use bonsaidb_core::networking::SetUserPassword;
198
199        Ok(self.send_api_request(&SetUserPassword {
200            user: user.name()?.into_owned(),
201            password,
202        })?)
203    }
204
205    #[cfg(any(feature = "token-authentication", feature = "password-hashing"))]
206    fn authenticate(
207        &self,
208        authentication: bonsaidb_core::connection::Authentication,
209    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
210        let session =
211            self.send_api_request(&bonsaidb_core::networking::Authenticate { authentication })?;
212        Ok(Self(AsyncClient {
213            data: self.0.data.clone(),
214            session: ClientSession {
215                session: Arc::new(session),
216                connection_id: self.0.data.connection_counter.load(Ordering::SeqCst),
217            },
218            request_timeout: self.0.request_timeout,
219        }))
220    }
221
222    fn assume_identity(
223        &self,
224        identity: IdentityReference<'_>,
225    ) -> Result<Self::Authenticated, bonsaidb_core::Error> {
226        let session = self.send_api_request(&AssumeIdentity(identity.into_owned()))?;
227        Ok(Self(AsyncClient {
228            data: self.0.data.clone(),
229            session: ClientSession {
230                session: Arc::new(session),
231                connection_id: self.0.data.connection_counter.load(Ordering::SeqCst),
232            },
233            request_timeout: self.0.request_timeout,
234        }))
235    }
236
237    fn add_permission_group_to_user<
238        'user,
239        'group,
240        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
241        G: bonsaidb_core::schema::Nameable<'group, u64> + Send + Sync,
242    >(
243        &self,
244        user: U,
245        permission_group: G,
246    ) -> Result<(), bonsaidb_core::Error> {
247        self.send_api_request(&AlterUserPermissionGroupMembership {
248            user: user.name()?.into_owned(),
249            group: permission_group.name()?.into_owned(),
250            should_be_member: true,
251        })?;
252        Ok(())
253    }
254
255    fn remove_permission_group_from_user<
256        'user,
257        'group,
258        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
259        G: bonsaidb_core::schema::Nameable<'group, u64> + Send + Sync,
260    >(
261        &self,
262        user: U,
263        permission_group: G,
264    ) -> Result<(), bonsaidb_core::Error> {
265        self.send_api_request(&AlterUserPermissionGroupMembership {
266            user: user.name()?.into_owned(),
267            group: permission_group.name()?.into_owned(),
268            should_be_member: false,
269        })?;
270        Ok(())
271    }
272
273    fn add_role_to_user<
274        'user,
275        'role,
276        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
277        R: bonsaidb_core::schema::Nameable<'role, u64> + Send + Sync,
278    >(
279        &self,
280        user: U,
281        role: R,
282    ) -> Result<(), bonsaidb_core::Error> {
283        self.send_api_request(&AlterUserRoleMembership {
284            user: user.name()?.into_owned(),
285            role: role.name()?.into_owned(),
286            should_be_member: true,
287        })?;
288        Ok(())
289    }
290
291    fn remove_role_from_user<
292        'user,
293        'role,
294        U: bonsaidb_core::schema::Nameable<'user, u64> + Send + Sync,
295        R: bonsaidb_core::schema::Nameable<'role, u64> + Send + Sync,
296    >(
297        &self,
298        user: U,
299        role: R,
300    ) -> Result<(), bonsaidb_core::Error> {
301        self.send_api_request(&AlterUserRoleMembership {
302            user: user.name()?.into_owned(),
303            role: role.name()?.into_owned(),
304            should_be_member: false,
305        })?;
306        Ok(())
307    }
308}
309
310impl HasSession for BlockingClient {
311    fn session(&self) -> Option<&bonsaidb_core::connection::Session> {
312        self.0.session()
313    }
314}
315
316/// A remote database that blocks the current thread when performing its
317/// requests.
318#[derive(Debug, Clone)]
319pub struct BlockingRemoteDatabase(AsyncRemoteDatabase);
320
321impl Connection for BlockingRemoteDatabase {
322    type Storage = BlockingClient;
323
324    fn storage(&self) -> Self::Storage {
325        BlockingClient(self.0.client.clone())
326    }
327
328    fn list_executed_transactions(
329        &self,
330        starting_id: Option<u64>,
331        result_limit: Option<u32>,
332    ) -> Result<Vec<bonsaidb_core::transaction::Executed>, bonsaidb_core::Error> {
333        Ok(self
334            .0
335            .client
336            .send_blocking_api_request(&ListExecutedTransactions {
337                database: self.0.name.to_string(),
338                starting_id,
339                result_limit,
340            })?)
341    }
342
343    fn last_transaction_id(&self) -> Result<Option<u64>, bonsaidb_core::Error> {
344        Ok(self
345            .0
346            .client
347            .send_blocking_api_request(&LastTransactionId {
348                database: self.0.name.to_string(),
349            })?)
350    }
351
352    fn compact(&self) -> Result<(), bonsaidb_core::Error> {
353        self.0.send_blocking_api_request(&Compact {
354            database: self.0.name.to_string(),
355        })?;
356        Ok(())
357    }
358
359    fn compact_key_value_store(&self) -> Result<(), bonsaidb_core::Error> {
360        self.0.send_blocking_api_request(&CompactKeyValueStore {
361            database: self.0.name.to_string(),
362        })?;
363        Ok(())
364    }
365}
366
367impl LowLevelConnection for BlockingRemoteDatabase {
368    fn apply_transaction(
369        &self,
370        transaction: bonsaidb_core::transaction::Transaction,
371    ) -> Result<Vec<bonsaidb_core::transaction::OperationResult>, bonsaidb_core::Error> {
372        Ok(self.0.client.send_blocking_api_request(&ApplyTransaction {
373            database: self.0.name.to_string(),
374            transaction,
375        })?)
376    }
377
378    fn get_from_collection(
379        &self,
380        id: bonsaidb_core::document::DocumentId,
381        collection: &CollectionName,
382    ) -> Result<Option<OwnedDocument>, bonsaidb_core::Error> {
383        Ok(self.0.client.send_blocking_api_request(&Get {
384            database: self.0.name.to_string(),
385            collection: collection.clone(),
386            id,
387        })?)
388    }
389
390    fn get_multiple_from_collection(
391        &self,
392        ids: &[bonsaidb_core::document::DocumentId],
393        collection: &CollectionName,
394    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
395        Ok(self.0.client.send_blocking_api_request(&GetMultiple {
396            database: self.0.name.to_string(),
397            collection: collection.clone(),
398            ids: ids.to_vec(),
399        })?)
400    }
401
402    fn list_from_collection(
403        &self,
404        ids: Range<bonsaidb_core::document::DocumentId>,
405        order: Sort,
406        limit: Option<u32>,
407        collection: &CollectionName,
408    ) -> Result<Vec<OwnedDocument>, bonsaidb_core::Error> {
409        Ok(self.0.client.send_blocking_api_request(&List {
410            database: self.0.name.to_string(),
411            collection: collection.clone(),
412            ids,
413            order,
414            limit,
415        })?)
416    }
417
418    fn list_headers_from_collection(
419        &self,
420        ids: Range<DocumentId>,
421        order: Sort,
422        limit: Option<u32>,
423        collection: &CollectionName,
424    ) -> Result<Vec<Header>, bonsaidb_core::Error> {
425        Ok(self.0.client.send_blocking_api_request(&ListHeaders(List {
426            database: self.0.name.to_string(),
427            collection: collection.clone(),
428            ids,
429            order,
430            limit,
431        }))?)
432    }
433
434    fn count_from_collection(
435        &self,
436        ids: Range<bonsaidb_core::document::DocumentId>,
437        collection: &CollectionName,
438    ) -> Result<u64, bonsaidb_core::Error> {
439        Ok(self.0.client.send_blocking_api_request(&Count {
440            database: self.0.name.to_string(),
441            collection: collection.clone(),
442            ids,
443        })?)
444    }
445
446    fn compact_collection_by_name(
447        &self,
448        collection: CollectionName,
449    ) -> Result<(), bonsaidb_core::Error> {
450        self.0.send_blocking_api_request(&CompactCollection {
451            database: self.0.name.to_string(),
452            name: collection,
453        })?;
454        Ok(())
455    }
456
457    fn query_by_name(
458        &self,
459        view: &ViewName,
460        key: Option<SerializedQueryKey>,
461        order: Sort,
462        limit: Option<u32>,
463        access_policy: AccessPolicy,
464    ) -> Result<Vec<map::Serialized>, bonsaidb_core::Error> {
465        Ok(self.0.client.send_blocking_api_request(&Query {
466            database: self.0.name.to_string(),
467            view: view.clone(),
468            key,
469            order,
470            limit,
471            access_policy,
472        })?)
473    }
474
475    fn query_by_name_with_docs(
476        &self,
477        view: &bonsaidb_core::schema::ViewName,
478        key: Option<SerializedQueryKey>,
479        order: Sort,
480        limit: Option<u32>,
481        access_policy: AccessPolicy,
482    ) -> Result<bonsaidb_core::schema::view::map::MappedSerializedDocuments, bonsaidb_core::Error>
483    {
484        Ok(self
485            .0
486            .client
487            .send_blocking_api_request(&QueryWithDocs(Query {
488                database: self.0.name.to_string(),
489                view: view.clone(),
490                key,
491                order,
492                limit,
493                access_policy,
494            }))?)
495    }
496
497    fn reduce_by_name(
498        &self,
499        view: &bonsaidb_core::schema::ViewName,
500        key: Option<SerializedQueryKey>,
501        access_policy: AccessPolicy,
502    ) -> Result<Vec<u8>, bonsaidb_core::Error> {
503        Ok(self
504            .0
505            .client
506            .send_blocking_api_request(&Reduce {
507                database: self.0.name.to_string(),
508                view: view.clone(),
509                key,
510                access_policy,
511            })?
512            .into_vec())
513    }
514
515    fn reduce_grouped_by_name(
516        &self,
517        view: &bonsaidb_core::schema::ViewName,
518        key: Option<SerializedQueryKey>,
519        access_policy: AccessPolicy,
520    ) -> Result<Vec<bonsaidb_core::schema::view::map::MappedSerializedValue>, bonsaidb_core::Error>
521    {
522        Ok(self
523            .0
524            .client
525            .send_blocking_api_request(&ReduceGrouped(Reduce {
526                database: self.0.name.to_string(),
527                view: view.clone(),
528                key,
529                access_policy,
530            }))?)
531    }
532
533    fn delete_docs_by_name(
534        &self,
535        view: &bonsaidb_core::schema::ViewName,
536        key: Option<SerializedQueryKey>,
537        access_policy: AccessPolicy,
538    ) -> Result<u64, bonsaidb_core::Error> {
539        Ok(self.0.client.send_blocking_api_request(&DeleteDocs {
540            database: self.0.name.to_string(),
541            view: view.clone(),
542            key,
543            access_policy,
544        })?)
545    }
546}
547
548impl HasSession for BlockingRemoteDatabase {
549    fn session(&self) -> Option<&bonsaidb_core::connection::Session> {
550        self.0.session()
551    }
552}
553
554impl HasSchema for BlockingRemoteDatabase {
555    fn schematic(&self) -> &bonsaidb_core::schema::Schematic {
556        self.0.schematic()
557    }
558}
559
560impl PubSub for BlockingRemoteDatabase {
561    type Subscriber = BlockingRemoteSubscriber;
562
563    fn create_subscriber(&self) -> Result<Self::Subscriber, bonsaidb_core::Error> {
564        let subscriber_id = self.0.client.send_blocking_api_request(&CreateSubscriber {
565            database: self.0.name.to_string(),
566        })?;
567
568        let (sender, receiver) = flume::unbounded();
569        self.0.client.register_subscriber(subscriber_id, sender);
570        Ok(BlockingRemoteSubscriber(AsyncRemoteSubscriber {
571            client: self.0.client.clone(),
572            database: self.0.name.clone(),
573            id: subscriber_id,
574            receiver: Receiver::new(receiver),
575            tokio: None,
576        }))
577    }
578
579    fn publish_bytes(&self, topic: Vec<u8>, payload: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
580        self.0.client.send_blocking_api_request(&Publish {
581            database: self.0.name.to_string(),
582            topic: Bytes::from(topic),
583            payload: Bytes::from(payload),
584        })?;
585        Ok(())
586    }
587
588    fn publish_bytes_to_all(
589        &self,
590        topics: impl IntoIterator<Item = Vec<u8>> + Send,
591        payload: Vec<u8>,
592    ) -> Result<(), bonsaidb_core::Error> {
593        let topics = topics.into_iter().map(Bytes::from).collect();
594        self.0.client.send_blocking_api_request(&PublishToAll {
595            database: self.0.name.to_string(),
596            topics,
597            payload: Bytes::from(payload),
598        })?;
599        Ok(())
600    }
601}
602
603/// A remote PubSub [`Subscriber`] that blocks the current thread when
604/// performing requests.
605#[derive(Debug)]
606pub struct BlockingRemoteSubscriber(AsyncRemoteSubscriber);
607
608impl Subscriber for BlockingRemoteSubscriber {
609    fn subscribe_to_bytes(&self, topic: Vec<u8>) -> Result<(), bonsaidb_core::Error> {
610        self.0.client.send_blocking_api_request(&SubscribeTo {
611            database: self.0.database.to_string(),
612            subscriber_id: self.0.id,
613            topic: Bytes::from(topic),
614        })?;
615        Ok(())
616    }
617
618    fn unsubscribe_from_bytes(&self, topic: &[u8]) -> Result<(), bonsaidb_core::Error> {
619        self.0.client.send_blocking_api_request(&UnsubscribeFrom {
620            database: self.0.database.to_string(),
621            subscriber_id: self.0.id,
622            topic: Bytes::from(topic),
623        })?;
624        Ok(())
625    }
626
627    fn receiver(&self) -> &Receiver {
628        AsyncSubscriber::receiver(&self.0)
629    }
630}
631
632impl KeyValue for BlockingRemoteDatabase {
633    fn execute_key_operation(
634        &self,
635        op: bonsaidb_core::keyvalue::KeyOperation,
636    ) -> Result<bonsaidb_core::keyvalue::Output, bonsaidb_core::Error> {
637        Ok(self
638            .0
639            .client
640            .send_blocking_api_request(&ExecuteKeyOperation {
641                database: self.0.name.to_string(),
642
643                op,
644            })?)
645    }
646}
647
648pub enum Tokio {
649    Runtime(Runtime),
650    Handle(Handle),
651}
652
653impl Tokio {
654    pub fn spawn<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
655        self,
656        task: F,
657    ) -> JoinHandle<Result<(), crate::Error>> {
658        match self {
659            Self::Runtime(tokio) => {
660                // When we have an owned runtime, we must have a thread driving
661                // the runtime. To keep the interface to `Client` simple, we are
662                // going to spawn the task and let the main block_on task simply
663                // wait for the completion event. If the JoinHandle is
664                // cancelled, the sender will be dropped and everything will
665                // clean up.
666                let (completion_sender, completion_receiver) = oneshot::channel();
667                let task = async move {
668                    task.await?;
669                    let _: Result<_, _> = completion_sender.send(());
670                    Ok(())
671                };
672                let task = tokio.spawn(task);
673
674                std::thread::spawn(move || {
675                    tokio.block_on(async move {
676                        let _: Result<_, _> = completion_receiver.await;
677                    });
678                });
679                task
680            }
681            Self::Handle(tokio) => tokio.spawn(task),
682        }
683    }
684}
685
686pub fn spawn_client<F: Future<Output = Result<(), crate::Error>> + Send + 'static>(
687    task: F,
688    handle: Option<Handle>,
689) -> JoinHandle<Result<(), crate::Error>> {
690    // We need to spawn a runtime or
691    let tokio = if let Some(handle) = handle {
692        Tokio::Handle(handle)
693    } else {
694        Tokio::Runtime(Runtime::new().unwrap())
695    };
696    tokio.spawn(task)
697}