mongodb/
client.rs

1pub mod action;
2pub mod auth;
3#[cfg(feature = "in-use-encryption")]
4pub(crate) mod csfle;
5mod executor;
6pub mod options;
7pub mod session;
8
9use std::{
10    sync::{
11        atomic::{AtomicBool, Ordering},
12        Mutex as SyncMutex,
13    },
14    time::{Duration, Instant},
15};
16
17#[cfg(feature = "in-use-encryption")]
18pub use self::csfle::client_builder::*;
19use derive_where::derive_where;
20use futures_core::Future;
21use futures_util::FutureExt;
22
23#[cfg(feature = "tracing-unstable")]
24use crate::trace::{
25    command::CommandTracingEventEmitter,
26    server_selection::ServerSelectionTracingEventEmitter,
27    trace_or_log_enabled,
28    TracingOrLogLevel,
29    COMMAND_TRACING_EVENT_TARGET,
30};
31use crate::{
32    bson::doc,
33    concern::{ReadConcern, WriteConcern},
34    db::Database,
35    error::{Error, ErrorKind, Result},
36    event::command::CommandEvent,
37    id_set::IdSet,
38    operation::OverrideCriteriaFn,
39    options::{ClientOptions, DatabaseOptions, ReadPreference, SelectionCriteria, ServerAddress},
40    sdam::{
41        server_selection::{self, attempt_to_select_server},
42        SelectedServer,
43        Topology,
44    },
45    tracking_arc::TrackingArc,
46    BoxFuture,
47    ClientSession,
48};
49
50pub(crate) use executor::{HELLO_COMMAND_NAMES, REDACTED_COMMANDS};
51pub(crate) use session::{ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
52
53use session::{ServerSession, ServerSessionPool};
54
55const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
56
57/// This is the main entry point for the API. A `Client` is used to connect to a MongoDB cluster.
58/// By default, it will monitor the topology of the cluster, keeping track of any changes, such
59/// as servers being added or removed.
60///
61/// `Client` uses [`std::sync::Arc`](https://doc.rust-lang.org/std/sync/struct.Arc.html) internally,
62/// so it can safely be shared across threads or async tasks. For example:
63///
64/// ```rust
65/// # use mongodb::{bson::Document, Client, error::Result};
66/// #
67/// # async fn start_workers() -> Result<()> {
68/// let client = Client::with_uri_str("mongodb://example.com").await?;
69///
70/// for i in 0..5 {
71///     let client_ref = client.clone();
72///
73///     tokio::task::spawn(async move {
74///         let collection = client_ref.database("items").collection::<Document>(&format!("coll{}", i));
75///
76///         // Do something with the collection
77///     });
78/// }
79/// #
80/// # Ok(())
81/// # }
82/// ```
83/// ## Notes on performance
84/// Spawning many asynchronous tasks that use the driver concurrently like this is often the best
85/// way to achieve maximum performance, as the driver is designed to work well in such situations.
86///
87/// Additionally, using a custom Rust type that implements `Serialize` and `Deserialize` as the
88/// generic parameter of [`Collection`](../struct.Collection.html) instead of
89/// [`Document`](crate::bson::Document) can reduce the amount of time the driver and your
90/// application spends serializing and deserializing BSON, which can also lead to increased
91/// performance.
92///
93/// ## TCP Keepalive
94/// TCP keepalive is enabled by default with ``tcp_keepalive_time`` set to 120 seconds. The
95/// driver does not set ``tcp_keepalive_intvl``. See the
96/// [MongoDB Diagnostics FAQ keepalive section](https://www.mongodb.com/docs/manual/faq/diagnostics/#does-tcp-keepalive-time-affect-mongodb-deployments)
97/// for instructions on setting these values at the system level.
98///
99/// ## Clean shutdown
100/// Because Rust has no async equivalent of `Drop`, values that require server-side cleanup when
101/// dropped spawn a new async task to perform that cleanup.  This can cause two potential issues:
102///
103/// * Drop tasks pending or in progress when the async runtime shuts down may not complete, causing
104///   server-side resources to not be freed.
105/// * Drop tasks may run at an arbitrary time even after no `Client` values exist, making it hard to
106///   reason about associated resources (e.g. event handlers).
107///
108/// To address these issues, we highly recommend you use [`Client::shutdown`] in the termination
109/// path of your application.  This will ensure that outstanding resources have been cleaned up and
110/// terminate internal worker tasks before returning.  Please note that `shutdown` will wait for
111/// _all_ outstanding resource handles to be dropped, so they must either have been dropped before
112/// calling `shutdown` or in a concurrent task; see the documentation of `shutdown` for more
113/// details.
114#[derive(Debug, Clone)]
115pub struct Client {
116    inner: TrackingArc<ClientInner>,
117}
118
119#[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
120const _: fn() = || {
121    fn assert_send<T: Send>(_t: T) {}
122    fn assert_sync<T: Sync>(_t: T) {}
123
124    let _c: super::Client = todo!();
125    assert_send(_c);
126    assert_sync(_c);
127};
128
129#[derive(Debug)]
130struct ClientInner {
131    topology: Topology,
132    options: ClientOptions,
133    session_pool: ServerSessionPool,
134    shutdown: Shutdown,
135    dropped: AtomicBool,
136    end_sessions_token: std::sync::Mutex<AsyncDropToken>,
137    #[cfg(feature = "in-use-encryption")]
138    csfle: tokio::sync::RwLock<Option<csfle::ClientState>>,
139    #[cfg(test)]
140    disable_command_events: AtomicBool,
141}
142
143#[derive(Debug)]
144struct Shutdown {
145    pending_drops: SyncMutex<IdSet<crate::runtime::AsyncJoinHandle<()>>>,
146    executed: AtomicBool,
147}
148
149impl Client {
150    /// Creates a new `Client` connected to the cluster specified by `uri`. `uri` must be a valid
151    /// MongoDB connection string.
152    ///
153    /// See the documentation on
154    /// [`ClientOptions::parse`](options/struct.ClientOptions.html#method.parse) for more details.
155    pub async fn with_uri_str(uri: impl AsRef<str>) -> Result<Self> {
156        let options = ClientOptions::parse(uri.as_ref()).await?;
157
158        Client::with_options(options)
159    }
160
161    /// Creates a new `Client` connected to the cluster specified by `options`.
162    pub fn with_options(options: ClientOptions) -> Result<Self> {
163        options.validate()?;
164
165        // Spawn a cleanup task, similar to register_async_drop
166        let (cleanup_tx, cleanup_rx) = tokio::sync::oneshot::channel::<BoxFuture<'static, ()>>();
167        crate::runtime::spawn(async move {
168            // If the cleanup channel is closed, that task was dropped.
169            if let Ok(cleanup) = cleanup_rx.await {
170                cleanup.await;
171            }
172        });
173        let end_sessions_token = std::sync::Mutex::new(AsyncDropToken {
174            tx: Some(cleanup_tx),
175        });
176
177        let inner = TrackingArc::new(ClientInner {
178            topology: Topology::new(options.clone())?,
179            session_pool: ServerSessionPool::new(),
180            options,
181            shutdown: Shutdown {
182                pending_drops: SyncMutex::new(IdSet::new()),
183                executed: AtomicBool::new(false),
184            },
185            dropped: AtomicBool::new(false),
186            end_sessions_token,
187            #[cfg(feature = "in-use-encryption")]
188            csfle: Default::default(),
189            #[cfg(test)]
190            disable_command_events: AtomicBool::new(false),
191        });
192        Ok(Self { inner })
193    }
194
195    /// Return an `EncryptedClientBuilder` for constructing a `Client` with auto-encryption enabled.
196    ///
197    /// ```no_run
198    /// # use mongocrypt::ctx::KmsProvider;
199    /// # use mongodb::{Client, bson::{self, doc}, error::Result};
200    /// # async fn func() -> Result<()> {
201    /// # let client_options = todo!();
202    /// # let key_vault_namespace = todo!();
203    /// # let key_vault_client: Client = todo!();
204    /// # let local_key: bson::Binary = todo!();
205    /// let encrypted_client = Client::encrypted_builder(
206    ///     client_options,
207    ///     key_vault_namespace,
208    ///     [(KmsProvider::local(), doc! { "key": local_key }, None)],
209    /// )?
210    /// .key_vault_client(key_vault_client)
211    /// .build()
212    /// .await?;
213    /// # Ok(())
214    /// # }
215    /// ```
216    #[cfg(feature = "in-use-encryption")]
217    pub fn encrypted_builder(
218        client_options: ClientOptions,
219        key_vault_namespace: crate::Namespace,
220        kms_providers: impl IntoIterator<
221            Item = (
222                mongocrypt::ctx::KmsProvider,
223                crate::bson::Document,
224                Option<options::TlsOptions>,
225            ),
226        >,
227    ) -> Result<EncryptedClientBuilder> {
228        Ok(EncryptedClientBuilder::new(
229            client_options,
230            csfle::options::AutoEncryptionOptions::new(
231                key_vault_namespace,
232                csfle::options::KmsProviders::new(kms_providers)?,
233            ),
234        ))
235    }
236
237    /// Whether commands sent via this client should be auto-encrypted.
238    pub(crate) async fn should_auto_encrypt(&self) -> bool {
239        #[cfg(feature = "in-use-encryption")]
240        {
241            let csfle = self.inner.csfle.read().await;
242            match *csfle {
243                Some(ref csfle) => csfle
244                    .opts()
245                    .bypass_auto_encryption
246                    .map(|b| !b)
247                    .unwrap_or(true),
248                None => false,
249            }
250        }
251        #[cfg(not(feature = "in-use-encryption"))]
252        {
253            false
254        }
255    }
256
257    #[cfg(all(test, feature = "in-use-encryption"))]
258    pub(crate) async fn mongocryptd_spawned(&self) -> bool {
259        self.inner
260            .csfle
261            .read()
262            .await
263            .as_ref()
264            .is_some_and(|cs| cs.exec().mongocryptd_spawned())
265    }
266
267    #[cfg(all(test, feature = "in-use-encryption"))]
268    pub(crate) async fn has_mongocryptd_client(&self) -> bool {
269        self.inner
270            .csfle
271            .read()
272            .await
273            .as_ref()
274            .is_some_and(|cs| cs.exec().has_mongocryptd_client())
275    }
276
277    fn test_command_event_channel(&self) -> Option<&options::TestEventSender> {
278        #[cfg(test)]
279        {
280            self.inner
281                .options
282                .test_options
283                .as_ref()
284                .and_then(|t| t.async_event_listener.as_ref())
285        }
286        #[cfg(not(test))]
287        {
288            None
289        }
290    }
291
292    pub(crate) async fn emit_command_event(&self, generate_event: impl FnOnce() -> CommandEvent) {
293        #[cfg(test)]
294        if self
295            .inner
296            .disable_command_events
297            .load(std::sync::atomic::Ordering::SeqCst)
298        {
299            return;
300        }
301        #[cfg(feature = "tracing-unstable")]
302        let tracing_emitter = if trace_or_log_enabled!(
303            target: COMMAND_TRACING_EVENT_TARGET,
304            TracingOrLogLevel::Debug
305        ) {
306            Some(CommandTracingEventEmitter::new(
307                self.inner.options.tracing_max_document_length_bytes,
308                self.inner.topology.id,
309            ))
310        } else {
311            None
312        };
313        let test_channel = self.test_command_event_channel();
314        let should_send = test_channel.is_some() || self.options().command_event_handler.is_some();
315        #[cfg(feature = "tracing-unstable")]
316        let should_send = should_send || tracing_emitter.is_some();
317        if !should_send {
318            return;
319        }
320
321        let event = generate_event();
322        if let Some(tx) = test_channel {
323            let (msg, ack) = crate::runtime::AcknowledgedMessage::package(event.clone());
324            let _ = tx.send(msg).await;
325            ack.wait_for_acknowledgment().await;
326        }
327        #[cfg(feature = "tracing-unstable")]
328        if let Some(ref tracing_emitter) = tracing_emitter {
329            tracing_emitter.handle(event.clone());
330        }
331        if let Some(handler) = &self.options().command_event_handler {
332            handler.handle(event);
333        }
334    }
335
336    /// Gets the default selection criteria the `Client` uses for operations..
337    pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
338        self.inner.options.selection_criteria.as_ref()
339    }
340
341    /// Gets the default read concern the `Client` uses for operations.
342    pub fn read_concern(&self) -> Option<&ReadConcern> {
343        self.inner.options.read_concern.as_ref()
344    }
345
346    /// Gets the default write concern the `Client` uses for operations.
347    pub fn write_concern(&self) -> Option<&WriteConcern> {
348        self.inner.options.write_concern.as_ref()
349    }
350
351    /// Gets a handle to a database specified by `name` in the cluster the `Client` is connected to.
352    /// The `Database` options (e.g. read preference and write concern) will default to those of the
353    /// `Client`.
354    ///
355    /// This method does not send or receive anything across the wire to the database, so it can be
356    /// used repeatedly without incurring any costs from I/O.
357    pub fn database(&self, name: &str) -> Database {
358        Database::new(self.clone(), name, None)
359    }
360
361    /// Gets a handle to a database specified by `name` in the cluster the `Client` is connected to.
362    /// Operations done with this `Database` will use the options specified by `options` by default
363    /// and will otherwise default to those of the `Client`.
364    ///
365    /// This method does not send or receive anything across the wire to the database, so it can be
366    /// used repeatedly without incurring any costs from I/O.
367    pub fn database_with_options(&self, name: &str, options: DatabaseOptions) -> Database {
368        Database::new(self.clone(), name, Some(options))
369    }
370
371    /// Gets a handle to the default database specified in the `ClientOptions` or MongoDB connection
372    /// string used to construct this `Client`.
373    ///
374    /// If no default database was specified, `None` will be returned.
375    pub fn default_database(&self) -> Option<Database> {
376        self.inner
377            .options
378            .default_database
379            .as_ref()
380            .map(|db_name| self.database(db_name))
381    }
382
383    pub(crate) fn register_async_drop(&self) -> AsyncDropToken {
384        let (cleanup_tx, cleanup_rx) = tokio::sync::oneshot::channel::<BoxFuture<'static, ()>>();
385        let (id_tx, id_rx) = tokio::sync::oneshot::channel::<crate::id_set::Id>();
386        let weak = self.weak();
387        let handle = crate::runtime::spawn(async move {
388            // Unwrap safety: the id is sent immediately after task creation, with no
389            // await points in between.
390            let id = id_rx.await.unwrap();
391            // If the cleanup channel is closed, that task was dropped.
392            if let Ok(cleanup) = cleanup_rx.await {
393                cleanup.await;
394            }
395            if let Some(client) = weak.upgrade() {
396                client
397                    .inner
398                    .shutdown
399                    .pending_drops
400                    .lock()
401                    .unwrap()
402                    .remove(&id);
403            }
404        });
405        let id = self
406            .inner
407            .shutdown
408            .pending_drops
409            .lock()
410            .unwrap()
411            .insert(handle);
412        let _ = id_tx.send(id);
413        AsyncDropToken {
414            tx: Some(cleanup_tx),
415        }
416    }
417
418    /// Check in a server session to the server session pool. The session will be discarded if it is
419    /// expired or dirty.
420    pub(crate) async fn check_in_server_session(&self, session: ServerSession) {
421        let timeout = self.inner.topology.logical_session_timeout();
422        self.inner.session_pool.check_in(session, timeout).await;
423    }
424
425    #[cfg(test)]
426    pub(crate) async fn clear_session_pool(&self) {
427        self.inner.session_pool.clear().await;
428    }
429
430    #[cfg(test)]
431    pub(crate) async fn is_session_checked_in(&self, id: &crate::bson::Document) -> bool {
432        self.inner.session_pool.contains(id).await
433    }
434
435    #[cfg(test)]
436    pub(crate) fn disable_command_events(&self, disable: bool) {
437        self.inner
438            .disable_command_events
439            .store(disable, std::sync::atomic::Ordering::SeqCst);
440    }
441
442    /// Get the address of the server selected according to the given criteria.
443    /// This method is only used in tests.
444    #[cfg(test)]
445    pub(crate) async fn test_select_server(
446        &self,
447        criteria: Option<&SelectionCriteria>,
448    ) -> Result<ServerAddress> {
449        let (server, _) = self
450            .select_server(criteria, "Test select server", None, |_, _| None)
451            .await?;
452        Ok(server.address.clone())
453    }
454
455    /// Select a server using the provided criteria. If none is provided, a primary read preference
456    /// will be used instead.
457    async fn select_server(
458        &self,
459        criteria: Option<&SelectionCriteria>,
460        #[allow(unused_variables)] // we only use the operation_name for tracing.
461        operation_name: &str,
462        deprioritized: Option<&ServerAddress>,
463        override_criteria: OverrideCriteriaFn,
464    ) -> Result<(SelectedServer, SelectionCriteria)> {
465        let criteria =
466            criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));
467
468        let start_time = Instant::now();
469        let timeout = self
470            .inner
471            .options
472            .server_selection_timeout
473            .unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT);
474
475        #[cfg(feature = "tracing-unstable")]
476        let event_emitter = ServerSelectionTracingEventEmitter::new(
477            self.inner.topology.id,
478            criteria,
479            operation_name,
480            start_time,
481            timeout,
482        );
483        #[cfg(feature = "tracing-unstable")]
484        event_emitter.emit_started_event(self.inner.topology.watch().observe_latest().description);
485        // We only want to emit this message once per operation at most.
486        #[cfg(feature = "tracing-unstable")]
487        let mut emitted_waiting_message = false;
488
489        let mut watcher = self.inner.topology.watch();
490        loop {
491            let state = watcher.observe_latest();
492            let override_slot;
493            let effective_criteria =
494                if let Some(oc) = override_criteria(criteria, &state.description) {
495                    override_slot = oc;
496                    &override_slot
497                } else {
498                    criteria
499                };
500            let result = server_selection::attempt_to_select_server(
501                effective_criteria,
502                &state.description,
503                &state.servers(),
504                deprioritized,
505            );
506            match result {
507                Err(error) => {
508                    #[cfg(feature = "tracing-unstable")]
509                    event_emitter.emit_failed_event(&state.description, &error);
510
511                    return Err(error);
512                }
513                Ok(result) => {
514                    if let Some(server) = result {
515                        #[cfg(feature = "tracing-unstable")]
516                        event_emitter.emit_succeeded_event(&state.description, &server);
517
518                        return Ok((server, effective_criteria.clone()));
519                    } else {
520                        #[cfg(feature = "tracing-unstable")]
521                        if !emitted_waiting_message {
522                            event_emitter.emit_waiting_event(&state.description);
523                            emitted_waiting_message = true;
524                        }
525
526                        watcher.request_immediate_check();
527
528                        let change_occurred = start_time.elapsed() < timeout
529                            && watcher
530                                .wait_for_update(timeout - start_time.elapsed())
531                                .await;
532                        if !change_occurred {
533                            let error: Error = ErrorKind::ServerSelection {
534                                message: state
535                                    .description
536                                    .server_selection_timeout_error_message(criteria),
537                            }
538                            .into();
539
540                            #[cfg(feature = "tracing-unstable")]
541                            event_emitter.emit_failed_event(&state.description, &error);
542
543                            return Err(error);
544                        }
545                    }
546                }
547            }
548        }
549    }
550
551    #[cfg(all(test, feature = "dns-resolver"))]
552    pub(crate) fn get_hosts(&self) -> Vec<String> {
553        let watcher = self.inner.topology.watch();
554        let state = watcher.peek_latest();
555
556        state
557            .servers()
558            .keys()
559            .map(|stream_address| format!("{stream_address}"))
560            .collect()
561    }
562
563    #[cfg(test)]
564    pub(crate) async fn sync_workers(&self) {
565        self.inner.topology.sync_workers().await;
566    }
567
568    #[cfg(test)]
569    pub(crate) fn topology_description(&self) -> crate::sdam::TopologyDescription {
570        self.inner
571            .topology
572            .watch()
573            .peek_latest()
574            .description
575            .clone()
576    }
577
578    #[cfg(test)]
579    pub(crate) fn topology(&self) -> &Topology {
580        &self.inner.topology
581    }
582
583    #[cfg(feature = "in-use-encryption")]
584    pub(crate) async fn primary_description(&self) -> Option<crate::sdam::ServerDescription> {
585        let start_time = Instant::now();
586        let timeout = self
587            .inner
588            .options
589            .server_selection_timeout
590            .unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT);
591        let mut watcher = self.inner.topology.watch();
592        loop {
593            let topology = watcher.observe_latest();
594            if let Some(desc) = topology.description.primary() {
595                return Some(desc.clone());
596            }
597            if !watcher
598                .wait_for_update(timeout - start_time.elapsed())
599                .await
600            {
601                return None;
602            }
603        }
604    }
605
606    pub(crate) fn weak(&self) -> WeakClient {
607        WeakClient {
608            inner: TrackingArc::downgrade(&self.inner),
609        }
610    }
611
612    #[cfg(feature = "in-use-encryption")]
613    pub(crate) async fn auto_encryption_opts(
614        &self,
615    ) -> Option<tokio::sync::RwLockReadGuard<'_, csfle::options::AutoEncryptionOptions>> {
616        tokio::sync::RwLockReadGuard::try_map(self.inner.csfle.read().await, |csfle| {
617            csfle.as_ref().map(|cs| cs.opts())
618        })
619        .ok()
620    }
621
622    pub(crate) fn options(&self) -> &ClientOptions {
623        &self.inner.options
624    }
625
626    /// Ends all sessions contained in this client's session pool on the server.
627    pub(crate) async fn end_all_sessions(&self) {
628        // The maximum number of session IDs that should be sent in a single endSessions command.
629        const MAX_END_SESSIONS_BATCH_SIZE: usize = 10_000;
630
631        let mut watcher = self.inner.topology.watch();
632        let selection_criteria =
633            SelectionCriteria::from(ReadPreference::PrimaryPreferred { options: None });
634
635        let session_ids = self.inner.session_pool.get_session_ids().await;
636        for chunk in session_ids.chunks(MAX_END_SESSIONS_BATCH_SIZE) {
637            let state = watcher.observe_latest();
638            let Ok(Some(_)) = attempt_to_select_server(
639                &selection_criteria,
640                &state.description,
641                &state.servers(),
642                None,
643            ) else {
644                // If a suitable server is not available, do not proceed with the operation to avoid
645                // spinning for server_selection_timeout.
646                return;
647            };
648
649            let end_sessions = doc! {
650                "endSessions": chunk,
651            };
652            let _ = self
653                .database("admin")
654                .run_command(end_sessions)
655                .selection_criteria(selection_criteria.clone())
656                .await;
657        }
658    }
659}
660
661#[derive(Clone, Debug)]
662pub(crate) struct WeakClient {
663    inner: crate::tracking_arc::Weak<ClientInner>,
664}
665
666impl WeakClient {
667    pub(crate) fn upgrade(&self) -> Option<Client> {
668        self.inner.upgrade().map(|inner| Client { inner })
669    }
670}
671
672#[derive_where(Debug)]
673pub(crate) struct AsyncDropToken {
674    #[derive_where(skip)]
675    tx: Option<tokio::sync::oneshot::Sender<BoxFuture<'static, ()>>>,
676}
677
678impl AsyncDropToken {
679    pub(crate) fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
680        if let Some(tx) = self.tx.take() {
681            let _ = tx.send(fut.boxed());
682        } else {
683            #[cfg(debug_assertions)]
684            panic!("exhausted AsyncDropToken");
685        }
686    }
687
688    pub(crate) fn take(&mut self) -> Self {
689        Self { tx: self.tx.take() }
690    }
691}
692
693impl Drop for Client {
694    fn drop(&mut self) {
695        if !self.inner.shutdown.executed.load(Ordering::SeqCst)
696            && !self.inner.dropped.load(Ordering::SeqCst)
697            && TrackingArc::strong_count(&self.inner) == 1
698        {
699            // We need an owned copy of the client to move into the spawned future. However, if this
700            // call to drop completes before the spawned future completes, the number of strong
701            // references to the inner client will again be 1 when the cloned client drops, and thus
702            // end_all_sessions will be called continuously until the runtime shuts down. Storing a
703            // flag indicating whether end_all_sessions has already been called breaks
704            // this cycle.
705            self.inner.dropped.store(true, Ordering::SeqCst);
706            let client = self.clone();
707            self.inner
708                .end_sessions_token
709                .lock()
710                .unwrap()
711                .spawn(async move {
712                    client.end_all_sessions().await;
713                });
714        }
715    }
716}