mongodb/event/
cmap.rs

1//! Contains the events and functionality for monitoring behavior of the connection pooling of a
2//! `Client`.
3
4use std::time::Duration;
5
6use derive_more::From;
7#[cfg(feature = "tracing-unstable")]
8use derive_where::derive_where;
9use serde::{Deserialize, Serialize};
10
11use crate::{bson::oid::ObjectId, options::ServerAddress, serde_util};
12
13#[cfg(feature = "tracing-unstable")]
14use crate::trace::{
15    connection::ConnectionTracingEventEmitter,
16    trace_or_log_enabled,
17    TracingOrLogLevel,
18    CONNECTION_TRACING_EVENT_TARGET,
19};
20
21use super::EventHandler;
22
23/// We implement `Deserialize` for all of the event types so that we can more easily parse the CMAP
24/// spec tests. However, we have no need to parse the address field from the JSON files (if it's
25/// even present). To facilitate populating the address field with an empty value when
26/// deserializing, we define a private `empty_address` function that the events can specify as the
27/// custom deserialization value for each address field.
28fn empty_address() -> ServerAddress {
29    ServerAddress::Tcp {
30        host: Default::default(),
31        port: None,
32    }
33}
34
35/// Event emitted when a connection pool is created.
36#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
37#[non_exhaustive]
38pub struct PoolCreatedEvent {
39    /// The address of the server that the pool's connections will connect to.
40    #[serde(default = "self::empty_address")]
41    #[serde(skip_deserializing)]
42    pub address: ServerAddress,
43
44    /// The options used for the pool.
45    pub options: Option<ConnectionPoolOptions>,
46}
47
48/// Contains the options for creating a connection pool. While these options are specified at the
49/// client-level, `ConnectionPoolOptions` is exposed for the purpose of CMAP event handling.
50#[derive(Clone, Default, Deserialize, Debug, PartialEq, Serialize)]
51#[serde(rename_all = "camelCase")]
52#[non_exhaustive]
53pub struct ConnectionPoolOptions {
54    /// Connections that have been ready for usage in the pool for longer than `max_idle_time` will
55    /// not be used.
56    ///
57    /// The default is that connections will not be closed due to being idle.
58    #[serde(rename = "maxIdleTimeMS")]
59    #[serde(default)]
60    #[serde(deserialize_with = "serde_util::deserialize_duration_option_from_u64_millis")]
61    pub max_idle_time: Option<Duration>,
62
63    /// The maximum number of connections that the pool can have at a given time. This includes
64    /// connections which are currently checked out of the pool.
65    ///
66    /// The default is 10.
67    pub max_pool_size: Option<u32>,
68
69    /// The minimum number of connections that the pool can have at a given time. This includes
70    /// connections which are currently checked out of the pool. If fewer than `min_pool_size`
71    /// connections are in the pool, connections will be added to the pool in the background.
72    ///
73    /// The default is that no minimum is enforced
74    pub min_pool_size: Option<u32>,
75}
76
77/// Event emitted when a connection pool becomes ready.
78#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
79#[non_exhaustive]
80pub struct PoolReadyEvent {
81    /// The address of the server that the pool's connections will connect to.
82    #[serde(default = "self::empty_address")]
83    #[serde(skip_deserializing)]
84    pub address: ServerAddress,
85}
86
87/// Event emitted when a connection pool is cleared.
88#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
89#[non_exhaustive]
90pub struct PoolClearedEvent {
91    /// The address of the server that the pool's connections will connect to.
92    #[serde(default = "self::empty_address")]
93    #[serde(skip_deserializing)]
94    pub address: ServerAddress,
95
96    /// If the connection is to a load balancer, the id of the selected backend.
97    pub service_id: Option<ObjectId>,
98
99    /// Whether in-use connections were interrupted when the pool cleared.
100    #[serde(default)]
101    pub interrupt_in_use_connections: bool,
102}
103
104/// Event emitted when a connection pool is cleared.
105#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
106#[non_exhaustive]
107pub struct PoolClosedEvent {
108    /// The address of the server that the pool's connections will connect to.
109    #[serde(default = "self::empty_address")]
110    #[serde(skip_deserializing)]
111    pub address: ServerAddress,
112}
113
114/// Event emitted when a connection is created.
115#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
116#[serde(rename_all = "camelCase")]
117#[non_exhaustive]
118pub struct ConnectionCreatedEvent {
119    /// The address of the server that the connection will connect to.
120    #[serde(default = "self::empty_address")]
121    #[serde(skip_deserializing)]
122    pub address: ServerAddress,
123
124    /// The unique ID of the connection. This is not used for anything internally, but can be used
125    /// to identify other events related to this connection.
126    #[serde(default = "default_connection_id")]
127    pub connection_id: u32,
128}
129
130/// Event emitted when a connection is ready to be used. This indicates that all the necessary
131/// prerequisites for using a connection (handshake, authentication, etc.) have been completed.
132#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
133#[serde(rename_all = "camelCase")]
134#[non_exhaustive]
135pub struct ConnectionReadyEvent {
136    /// The address of the server that the connection is connected to.
137    #[serde(default = "self::empty_address")]
138    #[serde(skip_deserializing)]
139    pub address: ServerAddress,
140
141    /// The unique ID of the connection. This is not used for anything internally, but can be used
142    /// to identify other events related to this connection.
143    #[serde(default = "default_connection_id")]
144    pub connection_id: u32,
145
146    /// The time it took to establish the connection.
147    #[serde(skip_deserializing)]
148    pub duration: Duration,
149}
150
151/// Event emitted when a connection is closed.
152#[derive(Clone, Debug, Deserialize, Serialize)]
153#[cfg_attr(feature = "tracing-unstable", derive_where(PartialEq))]
154#[cfg_attr(not(feature = "tracing-unstable"), derive(PartialEq))]
155#[serde(rename_all = "camelCase")]
156#[non_exhaustive]
157pub struct ConnectionClosedEvent {
158    /// The address of the server that the connection was connected to.
159    #[serde(default = "self::empty_address")]
160    #[serde(skip_deserializing)]
161    pub address: ServerAddress,
162
163    /// The unique ID of the connection. This is not used for anything internally, but can be used
164    /// to identify other events related to this connection.
165    #[serde(default)]
166    pub connection_id: u32,
167
168    /// The reason that the connection was closed.
169    #[cfg_attr(test, serde(default = "unset_connection_closed_reason"))]
170    pub reason: ConnectionClosedReason,
171
172    /// If the `reason` connection checkout failed was `Error`,the associated
173    /// error is contained here. This is attached so we can include it in log messages;
174    /// in future work we may add this to public API on the event itself. TODO: DRIVERS-2495
175    #[cfg(feature = "tracing-unstable")]
176    #[serde(skip)]
177    #[derive_where(skip)]
178    pub(crate) error: Option<crate::error::Error>,
179}
180
181/// The reasons that a connection may be closed.
182#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
183#[serde(rename_all = "camelCase")]
184#[non_exhaustive]
185pub enum ConnectionClosedReason {
186    /// The connection pool has been cleared since the connection was created.
187    Stale,
188
189    /// The connection has been available for longer than `max_idle_time` without being used.
190    Idle,
191
192    /// An error occurred while using the connection.
193    Error,
194
195    /// The connection was dropped during read or write.
196    Dropped,
197
198    /// The pool that the connection belongs to has been closed.
199    PoolClosed,
200
201    #[cfg(test)]
202    /// The value was not set in the test file.
203    Unset,
204}
205
206#[cfg(test)]
207fn unset_connection_closed_reason() -> ConnectionClosedReason {
208    ConnectionClosedReason::Unset
209}
210
211/// Event emitted when a thread begins checking out a connection to use for an operation.
212#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
213#[non_exhaustive]
214pub struct ConnectionCheckoutStartedEvent {
215    /// The address of the server that the connection will connect to.
216    #[serde(default = "self::empty_address")]
217    #[serde(skip_deserializing)]
218    pub address: ServerAddress,
219}
220
221/// Event emitted when a thread is unable to check out a connection.
222#[derive(Clone, Debug, Deserialize, Serialize)]
223#[cfg_attr(feature = "tracing-unstable", derive_where(PartialEq))]
224#[cfg_attr(not(feature = "tracing-unstable"), derive(PartialEq))]
225#[non_exhaustive]
226pub struct ConnectionCheckoutFailedEvent {
227    /// The address of the server that the connection would have connected to.
228    #[serde(default = "self::empty_address")]
229    #[serde(skip_deserializing)]
230    pub address: ServerAddress,
231
232    /// The reason a connection was unable to be checked out.
233    #[cfg_attr(test, serde(default = "unset_connection_checkout_failed_reason"))]
234    pub reason: ConnectionCheckoutFailedReason,
235
236    /// If the `reason` connection checkout failed was `ConnectionError`,the associated
237    /// error is contained here. This is attached so we can include it in log messages;
238    /// in future work we may add this to public API on the event itself. TODO: DRIVERS-2495
239    #[cfg(feature = "tracing-unstable")]
240    #[serde(skip)]
241    #[derive_where(skip)]
242    pub(crate) error: Option<crate::error::Error>,
243
244    /// See [ConnectionCheckedOutEvent::duration].
245    #[serde(skip_deserializing)]
246    pub duration: Duration,
247}
248
249/// The reasons a connection may not be able to be checked out.
250#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
251#[serde(rename_all = "camelCase")]
252#[non_exhaustive]
253pub enum ConnectionCheckoutFailedReason {
254    /// The `wait_queue_timeout` has elapsed while waiting for a connection to be available.
255    Timeout,
256
257    /// An error occurred while trying to establish a connection (e.g. during the handshake or
258    /// authentication).
259    ConnectionError,
260
261    #[cfg(test)]
262    /// The value was not set in the test file.
263    Unset,
264}
265
266#[cfg(test)]
267fn unset_connection_checkout_failed_reason() -> ConnectionCheckoutFailedReason {
268    ConnectionCheckoutFailedReason::Unset
269}
270
271/// Event emitted when a connection is successfully checked out.
272#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
273#[serde(rename_all = "camelCase")]
274#[non_exhaustive]
275pub struct ConnectionCheckedOutEvent {
276    /// The address of the server that the connection will connect to.
277    #[serde(default = "self::empty_address")]
278    #[serde(skip_deserializing)]
279    pub address: ServerAddress,
280
281    /// The unique ID of the connection. This is not used for anything internally, but can be used
282    /// to identify other events related to this connection.
283    #[serde(default = "default_connection_id")]
284    pub connection_id: u32,
285
286    /// The time it took to check out the connection.
287    #[serde(skip_deserializing)]
288    pub duration: Duration,
289}
290
291/// Event emitted when a connection is checked back into a connection pool.
292#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
293#[serde(rename_all = "camelCase")]
294#[non_exhaustive]
295pub struct ConnectionCheckedInEvent {
296    /// The address of the server that the connection was connected to.
297    #[serde(default = "self::empty_address")]
298    #[serde(skip_deserializing)]
299    pub address: ServerAddress,
300
301    /// The unique ID of the connection. This is not used for anything internally, but can be used
302    /// to identify other events related to this connection.
303    #[serde(default = "default_connection_id")]
304    pub connection_id: u32,
305}
306
307/// The default connection ID to use for deserialization of events from test files.
308/// This value will "match" any connection ID.
309fn default_connection_id() -> u32 {
310    42
311}
312
313/// Usage of this trait is deprecated.  Applications should use the [`EventHandler`] API.
314///
315/// Applications can implement this trait to specify custom logic to run on each CMAP event sent
316/// by the driver.
317///
318/// ```rust
319/// # #![allow(deprecated)]
320/// # use std::sync::Arc;
321/// #
322/// # use mongodb::{
323/// #     error::Result,
324/// #     event::cmap::{
325/// #         CmapEventHandler,
326/// #         ConnectionCheckoutFailedEvent
327/// #     },
328/// #     options::ClientOptions,
329/// # };
330/// # #[cfg(feature = "sync")]
331/// # use mongodb::sync::Client;
332/// # #[cfg(not(feature = "sync"))]
333/// # use mongodb::Client;
334/// #
335/// struct FailedCheckoutLogger;
336///
337/// impl CmapEventHandler for FailedCheckoutLogger {
338///     fn handle_connection_checkout_failed_event(&self, event: ConnectionCheckoutFailedEvent) {
339///         eprintln!("Failed connection checkout: {:?}", event);
340///     }
341/// }
342///
343/// # fn do_stuff() -> Result<()> {
344/// let handler = Arc::new(FailedCheckoutLogger);
345/// let options = ClientOptions::builder()
346///                   .cmap_event_handler(handler)
347///                   .build();
348/// let client = Client::with_options(options)?;
349///
350/// // Do things with the client, and failed connection pool checkouts will be logged to stderr.
351/// # Ok(())
352/// # }
353/// ```
354#[deprecated = "use the EventHandler API"]
355pub trait CmapEventHandler: Send + Sync {
356    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
357    /// whenever a connection pool is created.
358    fn handle_pool_created_event(&self, _event: PoolCreatedEvent) {}
359
360    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
361    /// whenever a connection pool marked as ready for use.
362    ///
363    /// Connections may not be created by or checked out from the pool until it has been marked as
364    /// ready.
365    fn handle_pool_ready_event(&self, _event: PoolReadyEvent) {}
366
367    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
368    /// whenever a connection pool is cleared.
369    fn handle_pool_cleared_event(&self, _event: PoolClearedEvent) {}
370
371    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
372    /// whenever a connection pool is cleared.
373    fn handle_pool_closed_event(&self, _event: PoolClosedEvent) {}
374
375    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
376    /// whenever a connection is created.
377    fn handle_connection_created_event(&self, _event: ConnectionCreatedEvent) {}
378
379    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
380    /// whenever a connection is ready to be used.
381    fn handle_connection_ready_event(&self, _event: ConnectionReadyEvent) {}
382
383    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
384    /// whenever a connection is closed.
385    fn handle_connection_closed_event(&self, _event: ConnectionClosedEvent) {}
386
387    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
388    /// whenever a thread begins checking out a connection to use for an operation.
389    fn handle_connection_checkout_started_event(&self, _event: ConnectionCheckoutStartedEvent) {}
390
391    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
392    /// whenever a thread is unable to check out a connection.
393    fn handle_connection_checkout_failed_event(&self, _event: ConnectionCheckoutFailedEvent) {}
394
395    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
396    /// whenever a connection is successfully checked out.
397    fn handle_connection_checked_out_event(&self, _event: ConnectionCheckedOutEvent) {}
398
399    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
400    /// whenever a connection is checked back into a connection pool.
401    fn handle_connection_checked_in_event(&self, _event: ConnectionCheckedInEvent) {}
402}
403
404#[derive(Clone, Debug, PartialEq, From)]
405#[non_exhaustive]
406#[allow(missing_docs)]
407pub enum CmapEvent {
408    PoolCreated(PoolCreatedEvent),
409    PoolReady(PoolReadyEvent),
410    PoolCleared(PoolClearedEvent),
411    PoolClosed(PoolClosedEvent),
412    ConnectionCreated(ConnectionCreatedEvent),
413    ConnectionReady(ConnectionReadyEvent),
414    ConnectionClosed(ConnectionClosedEvent),
415    ConnectionCheckoutStarted(ConnectionCheckoutStartedEvent),
416    ConnectionCheckoutFailed(ConnectionCheckoutFailedEvent),
417    ConnectionCheckedOut(ConnectionCheckedOutEvent),
418    ConnectionCheckedIn(ConnectionCheckedInEvent),
419}
420
421#[derive(Clone)]
422pub(crate) struct CmapEventEmitter {
423    user_handler: Option<EventHandler<CmapEvent>>,
424
425    #[cfg(feature = "tracing-unstable")]
426    tracing_emitter: ConnectionTracingEventEmitter,
427}
428
429impl CmapEventEmitter {
430    pub(crate) fn new(
431        user_handler: Option<EventHandler<CmapEvent>>,
432        #[cfg(feature = "tracing-unstable")] topology_id: ObjectId,
433        #[cfg(feature = "tracing-unstable")] max_document_length_bytes: Option<usize>,
434    ) -> CmapEventEmitter {
435        Self {
436            user_handler,
437            #[cfg(feature = "tracing-unstable")]
438            tracing_emitter: ConnectionTracingEventEmitter::new(
439                topology_id,
440                max_document_length_bytes,
441            ),
442        }
443    }
444
445    #[cfg(not(feature = "tracing-unstable"))]
446    pub(crate) fn emit_event(&self, generate_event: impl FnOnce() -> CmapEvent) {
447        if let Some(ref handler) = self.user_handler {
448            handler.handle(generate_event());
449        }
450    }
451
452    #[cfg(feature = "tracing-unstable")]
453    pub(crate) fn emit_event(&self, generate_event: impl FnOnce() -> CmapEvent) {
454        // if the user isn't actually interested in debug-level connection messages, we shouldn't
455        // bother with the expense of generating and emitting these events.
456        let tracing_emitter_to_use = if trace_or_log_enabled!(
457            target: CONNECTION_TRACING_EVENT_TARGET,
458            TracingOrLogLevel::Debug
459        ) {
460            Some(&self.tracing_emitter)
461        } else {
462            None
463        };
464
465        match (&self.user_handler, tracing_emitter_to_use) {
466            (None, None) => {}
467            (None, Some(tracing_emitter)) => {
468                let event = generate_event();
469                tracing_emitter.handle(event);
470            }
471            (Some(user_handler), None) => {
472                let event = generate_event();
473                user_handler.handle(event);
474            }
475            (Some(user_handler), Some(tracing_emitter)) => {
476                let event = generate_event();
477                user_handler.handle(event.clone());
478                tracing_emitter.handle(event);
479            }
480        };
481    }
482}