use std::time::Duration;
use derive_more::From;
#[cfg(feature = "tracing-unstable")]
use derive_where::derive_where;
use serde::{Deserialize, Serialize};
use crate::{bson::oid::ObjectId, options::ServerAddress, serde_util};
#[cfg(feature = "tracing-unstable")]
use crate::trace::{
connection::ConnectionTracingEventEmitter,
trace_or_log_enabled,
TracingOrLogLevel,
CONNECTION_TRACING_EVENT_TARGET,
};
use super::EventHandler;
fn empty_address() -> ServerAddress {
ServerAddress::Tcp {
host: Default::default(),
port: None,
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct PoolCreatedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
pub options: Option<ConnectionPoolOptions>,
}
#[derive(Clone, Default, Deserialize, Debug, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionPoolOptions {
#[serde(rename = "maxIdleTimeMS")]
#[serde(default)]
#[serde(deserialize_with = "serde_util::deserialize_duration_option_from_u64_millis")]
pub max_idle_time: Option<Duration>,
pub max_pool_size: Option<u32>,
pub min_pool_size: Option<u32>,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct PoolReadyEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct PoolClearedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
pub service_id: Option<ObjectId>,
#[serde(default)]
pub interrupt_in_use_connections: bool,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct PoolClosedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionCreatedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
#[serde(default = "default_connection_id")]
pub connection_id: u32,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionReadyEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
#[serde(default = "default_connection_id")]
pub connection_id: u32,
#[serde(skip_deserializing)]
pub duration: Duration,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[cfg_attr(feature = "tracing-unstable", derive_where(PartialEq))]
#[cfg_attr(not(feature = "tracing-unstable"), derive(PartialEq))]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionClosedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
#[serde(default)]
pub connection_id: u32,
#[cfg_attr(test, serde(default = "unset_connection_closed_reason"))]
pub reason: ConnectionClosedReason,
#[cfg(feature = "tracing-unstable")]
#[serde(skip)]
#[derive_where(skip)]
pub(crate) error: Option<crate::error::Error>,
pub(crate) service_id: Option<ObjectId>,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub enum ConnectionClosedReason {
Stale,
Idle,
Error,
Dropped,
PoolClosed,
#[cfg(test)]
Unset,
}
#[cfg(test)]
fn unset_connection_closed_reason() -> ConnectionClosedReason {
ConnectionClosedReason::Unset
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct ConnectionCheckoutStartedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[cfg_attr(feature = "tracing-unstable", derive_where(PartialEq))]
#[cfg_attr(not(feature = "tracing-unstable"), derive(PartialEq))]
#[non_exhaustive]
pub struct ConnectionCheckoutFailedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
#[cfg_attr(test, serde(default = "unset_connection_checkout_failed_reason"))]
pub reason: ConnectionCheckoutFailedReason,
#[cfg(feature = "tracing-unstable")]
#[serde(skip)]
#[derive_where(skip)]
pub(crate) error: Option<crate::error::Error>,
#[serde(skip_deserializing)]
pub duration: Duration,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub enum ConnectionCheckoutFailedReason {
Timeout,
ConnectionError,
#[cfg(test)]
Unset,
}
#[cfg(test)]
fn unset_connection_checkout_failed_reason() -> ConnectionCheckoutFailedReason {
ConnectionCheckoutFailedReason::Unset
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionCheckedOutEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
#[serde(default = "default_connection_id")]
pub connection_id: u32,
#[serde(skip_deserializing)]
pub duration: Duration,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionCheckedInEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
#[serde(default = "default_connection_id")]
pub connection_id: u32,
}
fn default_connection_id() -> u32 {
42
}
#[deprecated = "use the EventHandler API"]
pub trait CmapEventHandler: Send + Sync {
fn handle_pool_created_event(&self, _event: PoolCreatedEvent) {}
fn handle_pool_ready_event(&self, _event: PoolReadyEvent) {}
fn handle_pool_cleared_event(&self, _event: PoolClearedEvent) {}
fn handle_pool_closed_event(&self, _event: PoolClosedEvent) {}
fn handle_connection_created_event(&self, _event: ConnectionCreatedEvent) {}
fn handle_connection_ready_event(&self, _event: ConnectionReadyEvent) {}
fn handle_connection_closed_event(&self, _event: ConnectionClosedEvent) {}
fn handle_connection_checkout_started_event(&self, _event: ConnectionCheckoutStartedEvent) {}
fn handle_connection_checkout_failed_event(&self, _event: ConnectionCheckoutFailedEvent) {}
fn handle_connection_checked_out_event(&self, _event: ConnectionCheckedOutEvent) {}
fn handle_connection_checked_in_event(&self, _event: ConnectionCheckedInEvent) {}
}
#[derive(Clone, Debug, PartialEq, From)]
#[non_exhaustive]
#[allow(missing_docs)]
pub enum CmapEvent {
PoolCreated(PoolCreatedEvent),
PoolReady(PoolReadyEvent),
PoolCleared(PoolClearedEvent),
PoolClosed(PoolClosedEvent),
ConnectionCreated(ConnectionCreatedEvent),
ConnectionReady(ConnectionReadyEvent),
ConnectionClosed(ConnectionClosedEvent),
ConnectionCheckoutStarted(ConnectionCheckoutStartedEvent),
ConnectionCheckoutFailed(ConnectionCheckoutFailedEvent),
ConnectionCheckedOut(ConnectionCheckedOutEvent),
ConnectionCheckedIn(ConnectionCheckedInEvent),
}
#[derive(Clone)]
pub(crate) struct CmapEventEmitter {
user_handler: Option<EventHandler<CmapEvent>>,
#[cfg(feature = "tracing-unstable")]
tracing_emitter: ConnectionTracingEventEmitter,
}
impl CmapEventEmitter {
pub(crate) fn new(
user_handler: Option<EventHandler<CmapEvent>>,
#[cfg(feature = "tracing-unstable")] topology_id: ObjectId,
#[cfg(feature = "tracing-unstable")] max_document_length_bytes: Option<usize>,
) -> CmapEventEmitter {
Self {
user_handler,
#[cfg(feature = "tracing-unstable")]
tracing_emitter: ConnectionTracingEventEmitter::new(
topology_id,
max_document_length_bytes,
),
}
}
#[cfg(not(feature = "tracing-unstable"))]
pub(crate) fn emit_event(&self, generate_event: impl FnOnce() -> CmapEvent) {
if let Some(ref handler) = self.user_handler {
handler.handle(generate_event());
}
}
#[cfg(feature = "tracing-unstable")]
pub(crate) fn emit_event(&self, generate_event: impl FnOnce() -> CmapEvent) {
let tracing_emitter_to_use = if trace_or_log_enabled!(
target: CONNECTION_TRACING_EVENT_TARGET,
TracingOrLogLevel::Debug
) {
Some(&self.tracing_emitter)
} else {
None
};
match (&self.user_handler, tracing_emitter_to_use) {
(None, None) => {}
(None, Some(tracing_emitter)) => {
let event = generate_event();
tracing_emitter.handle(event);
}
(Some(user_handler), None) => {
let event = generate_event();
user_handler.handle(event);
}
(Some(user_handler), Some(tracing_emitter)) => {
let event = generate_event();
user_handler.handle(event.clone());
tracing_emitter.handle(event);
}
};
}
}