mongodb/event/cmap.rs
1//! Contains the events and functionality for monitoring behavior of the connection pooling of a
2//! `Client`.
3
4use std::time::Duration;
5
6use derive_more::From;
7#[cfg(feature = "tracing-unstable")]
8use derive_where::derive_where;
9use serde::{Deserialize, Serialize};
10
11use crate::{bson::oid::ObjectId, options::ServerAddress, serde_util};
12
13#[cfg(feature = "tracing-unstable")]
14use crate::trace::{
15 connection::ConnectionTracingEventEmitter,
16 trace_or_log_enabled,
17 TracingOrLogLevel,
18 CONNECTION_TRACING_EVENT_TARGET,
19};
20
21use super::EventHandler;
22
23/// We implement `Deserialize` for all of the event types so that we can more easily parse the CMAP
24/// spec tests. However, we have no need to parse the address field from the JSON files (if it's
25/// even present). To facilitate populating the address field with an empty value when
26/// deserializing, we define a private `empty_address` function that the events can specify as the
27/// custom deserialization value for each address field.
28fn empty_address() -> ServerAddress {
29 ServerAddress::Tcp {
30 host: Default::default(),
31 port: None,
32 }
33}
34
35/// Event emitted when a connection pool is created.
36#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
37#[non_exhaustive]
38pub struct PoolCreatedEvent {
39 /// The address of the server that the pool's connections will connect to.
40 #[serde(default = "self::empty_address")]
41 #[serde(skip_deserializing)]
42 pub address: ServerAddress,
43
44 /// The options used for the pool.
45 pub options: Option<ConnectionPoolOptions>,
46}
47
48/// Contains the options for creating a connection pool. While these options are specified at the
49/// client-level, `ConnectionPoolOptions` is exposed for the purpose of CMAP event handling.
50#[derive(Clone, Default, Deserialize, Debug, PartialEq, Serialize)]
51#[serde(rename_all = "camelCase")]
52#[non_exhaustive]
53pub struct ConnectionPoolOptions {
54 /// Connections that have been ready for usage in the pool for longer than `max_idle_time` will
55 /// not be used.
56 ///
57 /// The default is that connections will not be closed due to being idle.
58 #[serde(rename = "maxIdleTimeMS")]
59 #[serde(default)]
60 #[serde(deserialize_with = "serde_util::deserialize_duration_option_from_u64_millis")]
61 pub max_idle_time: Option<Duration>,
62
63 /// The maximum number of connections that the pool can have at a given time. This includes
64 /// connections which are currently checked out of the pool.
65 ///
66 /// The default is 10.
67 pub max_pool_size: Option<u32>,
68
69 /// The minimum number of connections that the pool can have at a given time. This includes
70 /// connections which are currently checked out of the pool. If fewer than `min_pool_size`
71 /// connections are in the pool, connections will be added to the pool in the background.
72 ///
73 /// The default is that no minimum is enforced
74 pub min_pool_size: Option<u32>,
75}
76
77/// Event emitted when a connection pool becomes ready.
78#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
79#[non_exhaustive]
80pub struct PoolReadyEvent {
81 /// The address of the server that the pool's connections will connect to.
82 #[serde(default = "self::empty_address")]
83 #[serde(skip_deserializing)]
84 pub address: ServerAddress,
85}
86
87/// Event emitted when a connection pool is cleared.
88#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
89#[non_exhaustive]
90pub struct PoolClearedEvent {
91 /// The address of the server that the pool's connections will connect to.
92 #[serde(default = "self::empty_address")]
93 #[serde(skip_deserializing)]
94 pub address: ServerAddress,
95
96 /// If the connection is to a load balancer, the id of the selected backend.
97 pub service_id: Option<ObjectId>,
98
99 /// Whether in-use connections were interrupted when the pool cleared.
100 #[serde(default)]
101 pub interrupt_in_use_connections: bool,
102}
103
104/// Event emitted when a connection pool is cleared.
105#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
106#[non_exhaustive]
107pub struct PoolClosedEvent {
108 /// The address of the server that the pool's connections will connect to.
109 #[serde(default = "self::empty_address")]
110 #[serde(skip_deserializing)]
111 pub address: ServerAddress,
112}
113
114/// Event emitted when a connection is created.
115#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
116#[serde(rename_all = "camelCase")]
117#[non_exhaustive]
118pub struct ConnectionCreatedEvent {
119 /// The address of the server that the connection will connect to.
120 #[serde(default = "self::empty_address")]
121 #[serde(skip_deserializing)]
122 pub address: ServerAddress,
123
124 /// The unique ID of the connection. This is not used for anything internally, but can be used
125 /// to identify other events related to this connection.
126 #[serde(default = "default_connection_id")]
127 pub connection_id: u32,
128}
129
130/// Event emitted when a connection is ready to be used. This indicates that all the necessary
131/// prerequisites for using a connection (handshake, authentication, etc.) have been completed.
132#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
133#[serde(rename_all = "camelCase")]
134#[non_exhaustive]
135pub struct ConnectionReadyEvent {
136 /// The address of the server that the connection is connected to.
137 #[serde(default = "self::empty_address")]
138 #[serde(skip_deserializing)]
139 pub address: ServerAddress,
140
141 /// The unique ID of the connection. This is not used for anything internally, but can be used
142 /// to identify other events related to this connection.
143 #[serde(default = "default_connection_id")]
144 pub connection_id: u32,
145
146 /// The time it took to establish the connection.
147 #[serde(skip_deserializing)]
148 pub duration: Duration,
149}
150
151/// Event emitted when a connection is closed.
152#[derive(Clone, Debug, Deserialize, Serialize)]
153#[cfg_attr(feature = "tracing-unstable", derive_where(PartialEq))]
154#[cfg_attr(not(feature = "tracing-unstable"), derive(PartialEq))]
155#[serde(rename_all = "camelCase")]
156#[non_exhaustive]
157pub struct ConnectionClosedEvent {
158 /// The address of the server that the connection was connected to.
159 #[serde(default = "self::empty_address")]
160 #[serde(skip_deserializing)]
161 pub address: ServerAddress,
162
163 /// The unique ID of the connection. This is not used for anything internally, but can be used
164 /// to identify other events related to this connection.
165 #[serde(default)]
166 pub connection_id: u32,
167
168 /// The reason that the connection was closed.
169 #[cfg_attr(test, serde(default = "unset_connection_closed_reason"))]
170 pub reason: ConnectionClosedReason,
171
172 /// If the `reason` connection checkout failed was `Error`,the associated
173 /// error is contained here. This is attached so we can include it in log messages;
174 /// in future work we may add this to public API on the event itself. TODO: DRIVERS-2495
175 #[cfg(feature = "tracing-unstable")]
176 #[serde(skip)]
177 #[derive_where(skip)]
178 pub(crate) error: Option<crate::error::Error>,
179}
180
181/// The reasons that a connection may be closed.
182#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
183#[serde(rename_all = "camelCase")]
184#[non_exhaustive]
185pub enum ConnectionClosedReason {
186 /// The connection pool has been cleared since the connection was created.
187 Stale,
188
189 /// The connection has been available for longer than `max_idle_time` without being used.
190 Idle,
191
192 /// An error occurred while using the connection.
193 Error,
194
195 /// The connection was dropped during read or write.
196 Dropped,
197
198 /// The pool that the connection belongs to has been closed.
199 PoolClosed,
200
201 #[cfg(test)]
202 /// The value was not set in the test file.
203 Unset,
204}
205
206#[cfg(test)]
207fn unset_connection_closed_reason() -> ConnectionClosedReason {
208 ConnectionClosedReason::Unset
209}
210
211/// Event emitted when a thread begins checking out a connection to use for an operation.
212#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
213#[non_exhaustive]
214pub struct ConnectionCheckoutStartedEvent {
215 /// The address of the server that the connection will connect to.
216 #[serde(default = "self::empty_address")]
217 #[serde(skip_deserializing)]
218 pub address: ServerAddress,
219}
220
221/// Event emitted when a thread is unable to check out a connection.
222#[derive(Clone, Debug, Deserialize, Serialize)]
223#[cfg_attr(feature = "tracing-unstable", derive_where(PartialEq))]
224#[cfg_attr(not(feature = "tracing-unstable"), derive(PartialEq))]
225#[non_exhaustive]
226pub struct ConnectionCheckoutFailedEvent {
227 /// The address of the server that the connection would have connected to.
228 #[serde(default = "self::empty_address")]
229 #[serde(skip_deserializing)]
230 pub address: ServerAddress,
231
232 /// The reason a connection was unable to be checked out.
233 #[cfg_attr(test, serde(default = "unset_connection_checkout_failed_reason"))]
234 pub reason: ConnectionCheckoutFailedReason,
235
236 /// If the `reason` connection checkout failed was `ConnectionError`,the associated
237 /// error is contained here. This is attached so we can include it in log messages;
238 /// in future work we may add this to public API on the event itself. TODO: DRIVERS-2495
239 #[cfg(feature = "tracing-unstable")]
240 #[serde(skip)]
241 #[derive_where(skip)]
242 pub(crate) error: Option<crate::error::Error>,
243
244 /// See [ConnectionCheckedOutEvent::duration].
245 #[serde(skip_deserializing)]
246 pub duration: Duration,
247}
248
249/// The reasons a connection may not be able to be checked out.
250#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
251#[serde(rename_all = "camelCase")]
252#[non_exhaustive]
253pub enum ConnectionCheckoutFailedReason {
254 /// The `wait_queue_timeout` has elapsed while waiting for a connection to be available.
255 Timeout,
256
257 /// An error occurred while trying to establish a connection (e.g. during the handshake or
258 /// authentication).
259 ConnectionError,
260
261 #[cfg(test)]
262 /// The value was not set in the test file.
263 Unset,
264}
265
266#[cfg(test)]
267fn unset_connection_checkout_failed_reason() -> ConnectionCheckoutFailedReason {
268 ConnectionCheckoutFailedReason::Unset
269}
270
271/// Event emitted when a connection is successfully checked out.
272#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
273#[serde(rename_all = "camelCase")]
274#[non_exhaustive]
275pub struct ConnectionCheckedOutEvent {
276 /// The address of the server that the connection will connect to.
277 #[serde(default = "self::empty_address")]
278 #[serde(skip_deserializing)]
279 pub address: ServerAddress,
280
281 /// The unique ID of the connection. This is not used for anything internally, but can be used
282 /// to identify other events related to this connection.
283 #[serde(default = "default_connection_id")]
284 pub connection_id: u32,
285
286 /// The time it took to check out the connection.
287 #[serde(skip_deserializing)]
288 pub duration: Duration,
289}
290
291/// Event emitted when a connection is checked back into a connection pool.
292#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
293#[serde(rename_all = "camelCase")]
294#[non_exhaustive]
295pub struct ConnectionCheckedInEvent {
296 /// The address of the server that the connection was connected to.
297 #[serde(default = "self::empty_address")]
298 #[serde(skip_deserializing)]
299 pub address: ServerAddress,
300
301 /// The unique ID of the connection. This is not used for anything internally, but can be used
302 /// to identify other events related to this connection.
303 #[serde(default = "default_connection_id")]
304 pub connection_id: u32,
305}
306
307/// The default connection ID to use for deserialization of events from test files.
308/// This value will "match" any connection ID.
309fn default_connection_id() -> u32 {
310 42
311}
312
313/// Usage of this trait is deprecated. Applications should use the [`EventHandler`] API.
314///
315/// Applications can implement this trait to specify custom logic to run on each CMAP event sent
316/// by the driver.
317///
318/// ```rust
319/// # #![allow(deprecated)]
320/// # use std::sync::Arc;
321/// #
322/// # use mongodb::{
323/// # error::Result,
324/// # event::cmap::{
325/// # CmapEventHandler,
326/// # ConnectionCheckoutFailedEvent
327/// # },
328/// # options::ClientOptions,
329/// # };
330/// # #[cfg(feature = "sync")]
331/// # use mongodb::sync::Client;
332/// # #[cfg(not(feature = "sync"))]
333/// # use mongodb::Client;
334/// #
335/// struct FailedCheckoutLogger;
336///
337/// impl CmapEventHandler for FailedCheckoutLogger {
338/// fn handle_connection_checkout_failed_event(&self, event: ConnectionCheckoutFailedEvent) {
339/// eprintln!("Failed connection checkout: {:?}", event);
340/// }
341/// }
342///
343/// # fn do_stuff() -> Result<()> {
344/// let handler = Arc::new(FailedCheckoutLogger);
345/// let options = ClientOptions::builder()
346/// .cmap_event_handler(handler)
347/// .build();
348/// let client = Client::with_options(options)?;
349///
350/// // Do things with the client, and failed connection pool checkouts will be logged to stderr.
351/// # Ok(())
352/// # }
353/// ```
354#[deprecated = "use the EventHandler API"]
355pub trait CmapEventHandler: Send + Sync {
356 /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
357 /// whenever a connection pool is created.
358 fn handle_pool_created_event(&self, _event: PoolCreatedEvent) {}
359
360 /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
361 /// whenever a connection pool marked as ready for use.
362 ///
363 /// Connections may not be created by or checked out from the pool until it has been marked as
364 /// ready.
365 fn handle_pool_ready_event(&self, _event: PoolReadyEvent) {}
366
367 /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
368 /// whenever a connection pool is cleared.
369 fn handle_pool_cleared_event(&self, _event: PoolClearedEvent) {}
370
371 /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
372 /// whenever a connection pool is cleared.
373 fn handle_pool_closed_event(&self, _event: PoolClosedEvent) {}
374
375 /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
376 /// whenever a connection is created.
377 fn handle_connection_created_event(&self, _event: ConnectionCreatedEvent) {}
378
379 /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
380 /// whenever a connection is ready to be used.
381 fn handle_connection_ready_event(&self, _event: ConnectionReadyEvent) {}
382
383 /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
384 /// whenever a connection is closed.
385 fn handle_connection_closed_event(&self, _event: ConnectionClosedEvent) {}
386
387 /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
388 /// whenever a thread begins checking out a connection to use for an operation.
389 fn handle_connection_checkout_started_event(&self, _event: ConnectionCheckoutStartedEvent) {}
390
391 /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
392 /// whenever a thread is unable to check out a connection.
393 fn handle_connection_checkout_failed_event(&self, _event: ConnectionCheckoutFailedEvent) {}
394
395 /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
396 /// whenever a connection is successfully checked out.
397 fn handle_connection_checked_out_event(&self, _event: ConnectionCheckedOutEvent) {}
398
399 /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
400 /// whenever a connection is checked back into a connection pool.
401 fn handle_connection_checked_in_event(&self, _event: ConnectionCheckedInEvent) {}
402}
403
404#[derive(Clone, Debug, PartialEq, From)]
405#[non_exhaustive]
406#[allow(missing_docs)]
407pub enum CmapEvent {
408 PoolCreated(PoolCreatedEvent),
409 PoolReady(PoolReadyEvent),
410 PoolCleared(PoolClearedEvent),
411 PoolClosed(PoolClosedEvent),
412 ConnectionCreated(ConnectionCreatedEvent),
413 ConnectionReady(ConnectionReadyEvent),
414 ConnectionClosed(ConnectionClosedEvent),
415 ConnectionCheckoutStarted(ConnectionCheckoutStartedEvent),
416 ConnectionCheckoutFailed(ConnectionCheckoutFailedEvent),
417 ConnectionCheckedOut(ConnectionCheckedOutEvent),
418 ConnectionCheckedIn(ConnectionCheckedInEvent),
419}
420
421#[derive(Clone)]
422pub(crate) struct CmapEventEmitter {
423 user_handler: Option<EventHandler<CmapEvent>>,
424
425 #[cfg(feature = "tracing-unstable")]
426 tracing_emitter: ConnectionTracingEventEmitter,
427}
428
429impl CmapEventEmitter {
430 pub(crate) fn new(
431 user_handler: Option<EventHandler<CmapEvent>>,
432 #[cfg(feature = "tracing-unstable")] topology_id: ObjectId,
433 #[cfg(feature = "tracing-unstable")] max_document_length_bytes: Option<usize>,
434 ) -> CmapEventEmitter {
435 Self {
436 user_handler,
437 #[cfg(feature = "tracing-unstable")]
438 tracing_emitter: ConnectionTracingEventEmitter::new(
439 topology_id,
440 max_document_length_bytes,
441 ),
442 }
443 }
444
445 #[cfg(not(feature = "tracing-unstable"))]
446 pub(crate) fn emit_event(&self, generate_event: impl FnOnce() -> CmapEvent) {
447 if let Some(ref handler) = self.user_handler {
448 handler.handle(generate_event());
449 }
450 }
451
452 #[cfg(feature = "tracing-unstable")]
453 pub(crate) fn emit_event(&self, generate_event: impl FnOnce() -> CmapEvent) {
454 // if the user isn't actually interested in debug-level connection messages, we shouldn't
455 // bother with the expense of generating and emitting these events.
456 let tracing_emitter_to_use = if trace_or_log_enabled!(
457 target: CONNECTION_TRACING_EVENT_TARGET,
458 TracingOrLogLevel::Debug
459 ) {
460 Some(&self.tracing_emitter)
461 } else {
462 None
463 };
464
465 match (&self.user_handler, tracing_emitter_to_use) {
466 (None, None) => {}
467 (None, Some(tracing_emitter)) => {
468 let event = generate_event();
469 tracing_emitter.handle(event);
470 }
471 (Some(user_handler), None) => {
472 let event = generate_event();
473 user_handler.handle(event);
474 }
475 (Some(user_handler), Some(tracing_emitter)) => {
476 let event = generate_event();
477 user_handler.handle(event.clone());
478 tracing_emitter.handle(event);
479 }
480 };
481 }
482}