1pub mod action;
2pub mod auth;
3#[cfg(feature = "in-use-encryption")]
4pub(crate) mod csfle;
5mod executor;
6pub mod options;
7pub mod session;
8
9use std::{
10 sync::{
11 atomic::{AtomicBool, Ordering},
12 Mutex as SyncMutex,
13 },
14 time::{Duration, Instant},
15};
16
17#[cfg(feature = "in-use-encryption")]
18pub use self::csfle::client_builder::*;
19use derive_where::derive_where;
20use futures_core::Future;
21use futures_util::FutureExt;
22
23#[cfg(feature = "tracing-unstable")]
24use crate::trace::{
25 command::CommandTracingEventEmitter,
26 server_selection::ServerSelectionTracingEventEmitter,
27 trace_or_log_enabled,
28 TracingOrLogLevel,
29 COMMAND_TRACING_EVENT_TARGET,
30};
31use crate::{
32 bson::doc,
33 concern::{ReadConcern, WriteConcern},
34 db::Database,
35 error::{Error, ErrorKind, Result},
36 event::command::CommandEvent,
37 id_set::IdSet,
38 operation::OverrideCriteriaFn,
39 options::{ClientOptions, DatabaseOptions, ReadPreference, SelectionCriteria, ServerAddress},
40 sdam::{
41 server_selection::{self, attempt_to_select_server},
42 SelectedServer,
43 Topology,
44 },
45 tracking_arc::TrackingArc,
46 BoxFuture,
47 ClientSession,
48};
49
50pub(crate) use executor::{HELLO_COMMAND_NAMES, REDACTED_COMMANDS};
51pub(crate) use session::{ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
52
53use session::{ServerSession, ServerSessionPool};
54
55const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
56
57#[derive(Debug, Clone)]
115pub struct Client {
116 inner: TrackingArc<ClientInner>,
117}
118
119#[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
120const _: fn() = || {
121 fn assert_send<T: Send>(_t: T) {}
122 fn assert_sync<T: Sync>(_t: T) {}
123
124 let _c: super::Client = todo!();
125 assert_send(_c);
126 assert_sync(_c);
127};
128
129#[derive(Debug)]
130struct ClientInner {
131 topology: Topology,
132 options: ClientOptions,
133 session_pool: ServerSessionPool,
134 shutdown: Shutdown,
135 dropped: AtomicBool,
136 end_sessions_token: std::sync::Mutex<AsyncDropToken>,
137 #[cfg(feature = "in-use-encryption")]
138 csfle: tokio::sync::RwLock<Option<csfle::ClientState>>,
139 #[cfg(test)]
140 disable_command_events: AtomicBool,
141}
142
143#[derive(Debug)]
144struct Shutdown {
145 pending_drops: SyncMutex<IdSet<crate::runtime::AsyncJoinHandle<()>>>,
146 executed: AtomicBool,
147}
148
149impl Client {
150 pub async fn with_uri_str(uri: impl AsRef<str>) -> Result<Self> {
156 let options = ClientOptions::parse(uri.as_ref()).await?;
157
158 Client::with_options(options)
159 }
160
161 pub fn with_options(options: ClientOptions) -> Result<Self> {
163 options.validate()?;
164
165 let (cleanup_tx, cleanup_rx) = tokio::sync::oneshot::channel::<BoxFuture<'static, ()>>();
167 crate::runtime::spawn(async move {
168 if let Ok(cleanup) = cleanup_rx.await {
170 cleanup.await;
171 }
172 });
173 let end_sessions_token = std::sync::Mutex::new(AsyncDropToken {
174 tx: Some(cleanup_tx),
175 });
176
177 let inner = TrackingArc::new(ClientInner {
178 topology: Topology::new(options.clone())?,
179 session_pool: ServerSessionPool::new(),
180 options,
181 shutdown: Shutdown {
182 pending_drops: SyncMutex::new(IdSet::new()),
183 executed: AtomicBool::new(false),
184 },
185 dropped: AtomicBool::new(false),
186 end_sessions_token,
187 #[cfg(feature = "in-use-encryption")]
188 csfle: Default::default(),
189 #[cfg(test)]
190 disable_command_events: AtomicBool::new(false),
191 });
192 Ok(Self { inner })
193 }
194
195 #[cfg(feature = "in-use-encryption")]
217 pub fn encrypted_builder(
218 client_options: ClientOptions,
219 key_vault_namespace: crate::Namespace,
220 kms_providers: impl IntoIterator<
221 Item = (
222 mongocrypt::ctx::KmsProvider,
223 crate::bson::Document,
224 Option<options::TlsOptions>,
225 ),
226 >,
227 ) -> Result<EncryptedClientBuilder> {
228 Ok(EncryptedClientBuilder::new(
229 client_options,
230 csfle::options::AutoEncryptionOptions::new(
231 key_vault_namespace,
232 csfle::options::KmsProviders::new(kms_providers)?,
233 ),
234 ))
235 }
236
237 pub(crate) async fn should_auto_encrypt(&self) -> bool {
239 #[cfg(feature = "in-use-encryption")]
240 {
241 let csfle = self.inner.csfle.read().await;
242 match *csfle {
243 Some(ref csfle) => csfle
244 .opts()
245 .bypass_auto_encryption
246 .map(|b| !b)
247 .unwrap_or(true),
248 None => false,
249 }
250 }
251 #[cfg(not(feature = "in-use-encryption"))]
252 {
253 false
254 }
255 }
256
257 #[cfg(all(test, feature = "in-use-encryption"))]
258 pub(crate) async fn mongocryptd_spawned(&self) -> bool {
259 self.inner
260 .csfle
261 .read()
262 .await
263 .as_ref()
264 .is_some_and(|cs| cs.exec().mongocryptd_spawned())
265 }
266
267 #[cfg(all(test, feature = "in-use-encryption"))]
268 pub(crate) async fn has_mongocryptd_client(&self) -> bool {
269 self.inner
270 .csfle
271 .read()
272 .await
273 .as_ref()
274 .is_some_and(|cs| cs.exec().has_mongocryptd_client())
275 }
276
277 fn test_command_event_channel(&self) -> Option<&options::TestEventSender> {
278 #[cfg(test)]
279 {
280 self.inner
281 .options
282 .test_options
283 .as_ref()
284 .and_then(|t| t.async_event_listener.as_ref())
285 }
286 #[cfg(not(test))]
287 {
288 None
289 }
290 }
291
292 pub(crate) async fn emit_command_event(&self, generate_event: impl FnOnce() -> CommandEvent) {
293 #[cfg(test)]
294 if self
295 .inner
296 .disable_command_events
297 .load(std::sync::atomic::Ordering::SeqCst)
298 {
299 return;
300 }
301 #[cfg(feature = "tracing-unstable")]
302 let tracing_emitter = if trace_or_log_enabled!(
303 target: COMMAND_TRACING_EVENT_TARGET,
304 TracingOrLogLevel::Debug
305 ) {
306 Some(CommandTracingEventEmitter::new(
307 self.inner.options.tracing_max_document_length_bytes,
308 self.inner.topology.id,
309 ))
310 } else {
311 None
312 };
313 let test_channel = self.test_command_event_channel();
314 let should_send = test_channel.is_some() || self.options().command_event_handler.is_some();
315 #[cfg(feature = "tracing-unstable")]
316 let should_send = should_send || tracing_emitter.is_some();
317 if !should_send {
318 return;
319 }
320
321 let event = generate_event();
322 if let Some(tx) = test_channel {
323 let (msg, ack) = crate::runtime::AcknowledgedMessage::package(event.clone());
324 let _ = tx.send(msg).await;
325 ack.wait_for_acknowledgment().await;
326 }
327 #[cfg(feature = "tracing-unstable")]
328 if let Some(ref tracing_emitter) = tracing_emitter {
329 tracing_emitter.handle(event.clone());
330 }
331 if let Some(handler) = &self.options().command_event_handler {
332 handler.handle(event);
333 }
334 }
335
336 pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
338 self.inner.options.selection_criteria.as_ref()
339 }
340
341 pub fn read_concern(&self) -> Option<&ReadConcern> {
343 self.inner.options.read_concern.as_ref()
344 }
345
346 pub fn write_concern(&self) -> Option<&WriteConcern> {
348 self.inner.options.write_concern.as_ref()
349 }
350
351 pub fn database(&self, name: &str) -> Database {
358 Database::new(self.clone(), name, None)
359 }
360
361 pub fn database_with_options(&self, name: &str, options: DatabaseOptions) -> Database {
368 Database::new(self.clone(), name, Some(options))
369 }
370
371 pub fn default_database(&self) -> Option<Database> {
376 self.inner
377 .options
378 .default_database
379 .as_ref()
380 .map(|db_name| self.database(db_name))
381 }
382
383 pub(crate) fn register_async_drop(&self) -> AsyncDropToken {
384 let (cleanup_tx, cleanup_rx) = tokio::sync::oneshot::channel::<BoxFuture<'static, ()>>();
385 let (id_tx, id_rx) = tokio::sync::oneshot::channel::<crate::id_set::Id>();
386 let weak = self.weak();
387 let handle = crate::runtime::spawn(async move {
388 let id = id_rx.await.unwrap();
391 if let Ok(cleanup) = cleanup_rx.await {
393 cleanup.await;
394 }
395 if let Some(client) = weak.upgrade() {
396 client
397 .inner
398 .shutdown
399 .pending_drops
400 .lock()
401 .unwrap()
402 .remove(&id);
403 }
404 });
405 let id = self
406 .inner
407 .shutdown
408 .pending_drops
409 .lock()
410 .unwrap()
411 .insert(handle);
412 let _ = id_tx.send(id);
413 AsyncDropToken {
414 tx: Some(cleanup_tx),
415 }
416 }
417
418 pub(crate) async fn check_in_server_session(&self, session: ServerSession) {
421 let timeout = self.inner.topology.logical_session_timeout();
422 self.inner.session_pool.check_in(session, timeout).await;
423 }
424
425 #[cfg(test)]
426 pub(crate) async fn clear_session_pool(&self) {
427 self.inner.session_pool.clear().await;
428 }
429
430 #[cfg(test)]
431 pub(crate) async fn is_session_checked_in(&self, id: &crate::bson::Document) -> bool {
432 self.inner.session_pool.contains(id).await
433 }
434
435 #[cfg(test)]
436 pub(crate) fn disable_command_events(&self, disable: bool) {
437 self.inner
438 .disable_command_events
439 .store(disable, std::sync::atomic::Ordering::SeqCst);
440 }
441
442 #[cfg(test)]
445 pub(crate) async fn test_select_server(
446 &self,
447 criteria: Option<&SelectionCriteria>,
448 ) -> Result<ServerAddress> {
449 let (server, _) = self
450 .select_server(criteria, "Test select server", None, |_, _| None)
451 .await?;
452 Ok(server.address.clone())
453 }
454
455 async fn select_server(
458 &self,
459 criteria: Option<&SelectionCriteria>,
460 #[allow(unused_variables)] operation_name: &str,
462 deprioritized: Option<&ServerAddress>,
463 override_criteria: OverrideCriteriaFn,
464 ) -> Result<(SelectedServer, SelectionCriteria)> {
465 let criteria =
466 criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));
467
468 let start_time = Instant::now();
469 let timeout = self
470 .inner
471 .options
472 .server_selection_timeout
473 .unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT);
474
475 #[cfg(feature = "tracing-unstable")]
476 let event_emitter = ServerSelectionTracingEventEmitter::new(
477 self.inner.topology.id,
478 criteria,
479 operation_name,
480 start_time,
481 timeout,
482 );
483 #[cfg(feature = "tracing-unstable")]
484 event_emitter.emit_started_event(self.inner.topology.watch().observe_latest().description);
485 #[cfg(feature = "tracing-unstable")]
487 let mut emitted_waiting_message = false;
488
489 let mut watcher = self.inner.topology.watch();
490 loop {
491 let state = watcher.observe_latest();
492 let override_slot;
493 let effective_criteria =
494 if let Some(oc) = override_criteria(criteria, &state.description) {
495 override_slot = oc;
496 &override_slot
497 } else {
498 criteria
499 };
500 let result = server_selection::attempt_to_select_server(
501 effective_criteria,
502 &state.description,
503 &state.servers(),
504 deprioritized,
505 );
506 match result {
507 Err(error) => {
508 #[cfg(feature = "tracing-unstable")]
509 event_emitter.emit_failed_event(&state.description, &error);
510
511 return Err(error);
512 }
513 Ok(result) => {
514 if let Some(server) = result {
515 #[cfg(feature = "tracing-unstable")]
516 event_emitter.emit_succeeded_event(&state.description, &server);
517
518 return Ok((server, effective_criteria.clone()));
519 } else {
520 #[cfg(feature = "tracing-unstable")]
521 if !emitted_waiting_message {
522 event_emitter.emit_waiting_event(&state.description);
523 emitted_waiting_message = true;
524 }
525
526 watcher.request_immediate_check();
527
528 let change_occurred = start_time.elapsed() < timeout
529 && watcher
530 .wait_for_update(timeout - start_time.elapsed())
531 .await;
532 if !change_occurred {
533 let error: Error = ErrorKind::ServerSelection {
534 message: state
535 .description
536 .server_selection_timeout_error_message(criteria),
537 }
538 .into();
539
540 #[cfg(feature = "tracing-unstable")]
541 event_emitter.emit_failed_event(&state.description, &error);
542
543 return Err(error);
544 }
545 }
546 }
547 }
548 }
549 }
550
551 #[cfg(all(test, feature = "dns-resolver"))]
552 pub(crate) fn get_hosts(&self) -> Vec<String> {
553 let watcher = self.inner.topology.watch();
554 let state = watcher.peek_latest();
555
556 state
557 .servers()
558 .keys()
559 .map(|stream_address| format!("{stream_address}"))
560 .collect()
561 }
562
563 #[cfg(test)]
564 pub(crate) async fn sync_workers(&self) {
565 self.inner.topology.sync_workers().await;
566 }
567
568 #[cfg(test)]
569 pub(crate) fn topology_description(&self) -> crate::sdam::TopologyDescription {
570 self.inner
571 .topology
572 .watch()
573 .peek_latest()
574 .description
575 .clone()
576 }
577
578 #[cfg(test)]
579 pub(crate) fn topology(&self) -> &Topology {
580 &self.inner.topology
581 }
582
583 #[cfg(feature = "in-use-encryption")]
584 pub(crate) async fn primary_description(&self) -> Option<crate::sdam::ServerDescription> {
585 let start_time = Instant::now();
586 let timeout = self
587 .inner
588 .options
589 .server_selection_timeout
590 .unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT);
591 let mut watcher = self.inner.topology.watch();
592 loop {
593 let topology = watcher.observe_latest();
594 if let Some(desc) = topology.description.primary() {
595 return Some(desc.clone());
596 }
597 if !watcher
598 .wait_for_update(timeout - start_time.elapsed())
599 .await
600 {
601 return None;
602 }
603 }
604 }
605
606 pub(crate) fn weak(&self) -> WeakClient {
607 WeakClient {
608 inner: TrackingArc::downgrade(&self.inner),
609 }
610 }
611
612 #[cfg(feature = "in-use-encryption")]
613 pub(crate) async fn auto_encryption_opts(
614 &self,
615 ) -> Option<tokio::sync::RwLockReadGuard<'_, csfle::options::AutoEncryptionOptions>> {
616 tokio::sync::RwLockReadGuard::try_map(self.inner.csfle.read().await, |csfle| {
617 csfle.as_ref().map(|cs| cs.opts())
618 })
619 .ok()
620 }
621
622 pub(crate) fn options(&self) -> &ClientOptions {
623 &self.inner.options
624 }
625
626 pub(crate) async fn end_all_sessions(&self) {
628 const MAX_END_SESSIONS_BATCH_SIZE: usize = 10_000;
630
631 let mut watcher = self.inner.topology.watch();
632 let selection_criteria =
633 SelectionCriteria::from(ReadPreference::PrimaryPreferred { options: None });
634
635 let session_ids = self.inner.session_pool.get_session_ids().await;
636 for chunk in session_ids.chunks(MAX_END_SESSIONS_BATCH_SIZE) {
637 let state = watcher.observe_latest();
638 let Ok(Some(_)) = attempt_to_select_server(
639 &selection_criteria,
640 &state.description,
641 &state.servers(),
642 None,
643 ) else {
644 return;
647 };
648
649 let end_sessions = doc! {
650 "endSessions": chunk,
651 };
652 let _ = self
653 .database("admin")
654 .run_command(end_sessions)
655 .selection_criteria(selection_criteria.clone())
656 .await;
657 }
658 }
659}
660
661#[derive(Clone, Debug)]
662pub(crate) struct WeakClient {
663 inner: crate::tracking_arc::Weak<ClientInner>,
664}
665
666impl WeakClient {
667 pub(crate) fn upgrade(&self) -> Option<Client> {
668 self.inner.upgrade().map(|inner| Client { inner })
669 }
670}
671
672#[derive_where(Debug)]
673pub(crate) struct AsyncDropToken {
674 #[derive_where(skip)]
675 tx: Option<tokio::sync::oneshot::Sender<BoxFuture<'static, ()>>>,
676}
677
678impl AsyncDropToken {
679 pub(crate) fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
680 if let Some(tx) = self.tx.take() {
681 let _ = tx.send(fut.boxed());
682 } else {
683 #[cfg(debug_assertions)]
684 panic!("exhausted AsyncDropToken");
685 }
686 }
687
688 pub(crate) fn take(&mut self) -> Self {
689 Self { tx: self.tx.take() }
690 }
691}
692
693impl Drop for Client {
694 fn drop(&mut self) {
695 if !self.inner.shutdown.executed.load(Ordering::SeqCst)
696 && !self.inner.dropped.load(Ordering::SeqCst)
697 && TrackingArc::strong_count(&self.inner) == 1
698 {
699 self.inner.dropped.store(true, Ordering::SeqCst);
706 let client = self.clone();
707 self.inner
708 .end_sessions_token
709 .lock()
710 .unwrap()
711 .spawn(async move {
712 client.end_all_sessions().await;
713 });
714 }
715 }
716}