1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
//! Contains the events and functionality for monitoring behavior of the connection pooling of a
//! `Client`.

use serde::Deserialize;

pub use crate::cmap::options::ConnectionPoolOptions;
use crate::options::StreamAddress;

/// We implement `Deserialize` for all of the event types so that we can more easily parse the CMAP
/// spec tests. However, we have no need to parse the address field from the JSON files (if it's
/// even present). To facilitate populating the address field with an empty value when
/// deserializing, we define a private `empty_address` function that the events can specify as the
/// custom deserialization value for each address field.
fn empty_address() -> StreamAddress {
    StreamAddress {
        hostname: Default::default(),
        port: None,
    }
}

/// Event emitted when a connection pool is created.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[non_exhaustive]
pub struct PoolCreatedEvent {
    /// The address of the server that the pool's connections will connect to.
    #[serde(default = "self::empty_address")]
    #[serde(skip)]
    pub address: StreamAddress,

    /// The options used for the pool.
    pub options: Option<ConnectionPoolOptions>,
}

/// Event emitted when a connection pool becomes ready.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[non_exhaustive]
pub struct PoolReadyEvent {
    /// The address of the server that the pool's connections will connect to.
    #[serde(default = "self::empty_address")]
    #[serde(skip)]
    pub address: StreamAddress,
}

/// Event emitted when a connection pool is cleared.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[non_exhaustive]
pub struct PoolClearedEvent {
    /// The address of the server that the pool's connections will connect to.
    #[serde(default = "self::empty_address")]
    #[serde(skip)]
    pub address: StreamAddress,
}

/// Event emitted when a connection pool is cleared.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[non_exhaustive]
pub struct PoolClosedEvent {
    /// The address of the server that the pool's connections will connect to.
    #[serde(default = "self::empty_address")]
    #[serde(skip)]
    pub address: StreamAddress,
}

/// Event emitted when a connection is created.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionCreatedEvent {
    /// The address of the server that the connection will connect to.
    #[serde(default = "self::empty_address")]
    #[serde(skip)]
    pub address: StreamAddress,

    /// The unique ID of the connection. This is not used for anything internally, but can be used
    /// to identify other events related to this connection.
    #[serde(default = "default_connection_id")]
    pub connection_id: u32,
}

/// Event emitted when a connection is ready to be used. This indicates that all the necessary
/// prerequisites for using a connection (handshake, authentication, etc.) have been completed.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionReadyEvent {
    /// The address of the server that the connection is connected to.
    #[serde(default = "self::empty_address")]
    #[serde(skip)]
    pub address: StreamAddress,

    /// The unique ID of the connection. This is not used for anything internally, but can be used
    /// to identify other events related to this connection.
    #[serde(default = "default_connection_id")]
    pub connection_id: u32,
}

/// Event emitted when a connection is closed.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionClosedEvent {
    /// The address of the server that the connection was connected to.
    #[serde(default = "self::empty_address")]
    #[serde(skip)]
    pub address: StreamAddress,

    /// The unique ID of the connection. This is not used for anything internally, but can be used
    /// to identify other events related to this connection.
    #[serde(default)]
    pub connection_id: u32,

    /// The reason that the connection was closed.
    pub reason: ConnectionClosedReason,
}

/// The reasons that a connection may be closed.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub enum ConnectionClosedReason {
    /// The connection pool has been cleared since the connection was created.
    Stale,

    /// The connection has been available for longer than `max_idle_time` without being used.
    Idle,

    /// An error occurred while using the connection.
    Error,

    /// The connection was dropped during read or write.
    Dropped,

    /// The pool that the connection belongs to has been closed.
    PoolClosed,
}

/// Event emitted when a thread begins checking out a connection to use for an operation.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[non_exhaustive]
pub struct ConnectionCheckoutStartedEvent {
    /// The address of the server that the connection will connect to.
    #[serde(default = "self::empty_address")]
    #[serde(skip)]
    pub address: StreamAddress,
}

/// Event emitted when a thread is unable to check out a connection.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[non_exhaustive]
pub struct ConnectionCheckoutFailedEvent {
    /// The address of the server that the connection would have connected to.
    #[serde(default = "self::empty_address")]
    #[serde(skip)]
    pub address: StreamAddress,

    /// The reason a connection was unable to be checked out.
    pub reason: ConnectionCheckoutFailedReason,
}

/// The reasons a connection may not be able to be checked out.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub enum ConnectionCheckoutFailedReason {
    /// The `wait_queue_timeout` has elapsed while waiting for a connection to be available.
    Timeout,

    /// An error occurred while trying to establish a connection (e.g. during the handshake or
    /// authentication).
    ConnectionError,
}

/// Event emitted when a connection is successfully checked out.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionCheckedOutEvent {
    /// The address of the server that the connection will connect to.
    #[serde(default = "self::empty_address")]
    #[serde(skip)]
    pub address: StreamAddress,

    /// The unique ID of the connection. This is not used for anything internally, but can be used
    /// to identify other events related to this connection.
    #[serde(default = "default_connection_id")]
    pub connection_id: u32,
}

/// Event emitted when a connection is checked back into a connection pool.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionCheckedInEvent {
    /// The address of the server that the connection was connected to.
    #[serde(default = "self::empty_address")]
    #[serde(skip)]
    pub address: StreamAddress,

    /// The unique ID of the connection. This is not used for anything internally, but can be used
    /// to identify other events related to this connection.
    #[serde(default = "default_connection_id")]
    pub connection_id: u32,
}

/// The default connection ID to use for deserialization of events from test files.
/// This value will "match" any connection ID.
fn default_connection_id() -> u32 {
    42
}

/// Applications can implement this trait to specify custom logic to run on each CMAP event sent
/// by the driver.
///
/// ```rust
/// # use std::sync::Arc;
/// #
/// # use mongodb::{
/// #     error::Result,
/// #     event::cmap::{
/// #         CmapEventHandler,
/// #         ConnectionCheckoutFailedEvent
/// #     },
/// #     options::ClientOptions,
/// # };
/// # #[cfg(feature = "sync")]
/// # use mongodb::sync::Client;
/// # #[cfg(not(feature = "sync"))]
/// # use mongodb::Client;
/// #
/// struct FailedCheckoutLogger;
///
/// impl CmapEventHandler for FailedCheckoutLogger {
///     fn handle_connection_checkout_failed_event(&self, event: ConnectionCheckoutFailedEvent) {
///         eprintln!("Failed connection checkout: {:?}", event);
///     }
/// }
///
/// # fn do_stuff() -> Result<()> {
/// let handler: Arc<dyn CmapEventHandler> = Arc::new(FailedCheckoutLogger);
/// let options = ClientOptions::builder()
///                   .cmap_event_handler(handler)
///                   .build();
/// let client = Client::with_options(options)?;
///
/// // Do things with the client, and failed connection pool checkouts will be logged to stderr.
/// # Ok(())
/// # }
/// ```
pub trait CmapEventHandler: Send + Sync {
    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
    /// whenever a connection pool is created.
    fn handle_pool_created_event(&self, _event: PoolCreatedEvent) {}

    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
    /// whenever a connection pool marked as ready for use.
    ///
    /// Connections may not be created by or checked out from the pool until it has been marked as
    /// ready.
    fn handle_pool_ready_event(&self, _event: PoolReadyEvent) {}

    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
    /// whenever a connection pool is cleared.
    fn handle_pool_cleared_event(&self, _event: PoolClearedEvent) {}

    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
    /// whenever a connection pool is cleared.
    fn handle_pool_closed_event(&self, _event: PoolClosedEvent) {}

    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
    /// whenever a connection is created.
    fn handle_connection_created_event(&self, _event: ConnectionCreatedEvent) {}

    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
    /// whenever a connection is ready to be used.
    fn handle_connection_ready_event(&self, _event: ConnectionReadyEvent) {}

    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
    /// whenever a connection is closed.
    fn handle_connection_closed_event(&self, _event: ConnectionClosedEvent) {}

    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
    /// whenever a thread begins checking out a connection to use for an operation.
    fn handle_connection_checkout_started_event(&self, _event: ConnectionCheckoutStartedEvent) {}

    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
    /// whenever a thread is unable to check out a connection.
    fn handle_connection_checkout_failed_event(&self, _event: ConnectionCheckoutFailedEvent) {}

    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
    /// whenever a connection is successfully checked out.
    fn handle_connection_checked_out_event(&self, _event: ConnectionCheckedOutEvent) {}

    /// A [`Client`](../../struct.Client.html) will call this method on each registered handler
    /// whenever a connection is checked back into a connection pool.
    fn handle_connection_checked_in_event(&self, _event: ConnectionCheckedInEvent) {}
}