use std::{sync::Arc, time::Duration};
use serde::{Deserialize, Serialize};
use crate::{bson::oid::ObjectId, options::ServerAddress, serde_util};
use derivative::Derivative;
use derive_more::From;
#[cfg(feature = "tracing-unstable")]
use crate::trace::{
connection::ConnectionTracingEventEmitter,
trace_or_log_enabled,
TracingOrLogLevel,
CONNECTION_TRACING_EVENT_TARGET,
};
fn empty_address() -> ServerAddress {
ServerAddress::Tcp {
host: Default::default(),
port: None,
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct PoolCreatedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
pub options: Option<ConnectionPoolOptions>,
}
#[derive(Clone, Default, Deserialize, Debug, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionPoolOptions {
#[serde(rename = "maxIdleTimeMS")]
#[serde(default)]
#[serde(deserialize_with = "serde_util::deserialize_duration_option_from_u64_millis")]
pub max_idle_time: Option<Duration>,
pub max_pool_size: Option<u32>,
pub min_pool_size: Option<u32>,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct PoolReadyEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct PoolClearedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
pub service_id: Option<ObjectId>,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct PoolClosedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionCreatedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
#[serde(default = "default_connection_id")]
pub connection_id: u32,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionReadyEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
#[serde(default = "default_connection_id")]
pub connection_id: u32,
#[serde(default = "Duration::default")]
pub duration: Duration,
}
#[derive(Clone, Debug, Deserialize, Derivative, Serialize)]
#[derivative(PartialEq)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionClosedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
#[serde(default)]
pub connection_id: u32,
pub reason: ConnectionClosedReason,
#[cfg(feature = "tracing-unstable")]
#[serde(skip)]
#[derivative(PartialEq = "ignore")]
pub(crate) error: Option<crate::error::Error>,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub enum ConnectionClosedReason {
Stale,
Idle,
Error,
Dropped,
PoolClosed,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[non_exhaustive]
pub struct ConnectionCheckoutStartedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
}
#[derive(Clone, Debug, Deserialize, Derivative, Serialize)]
#[derivative(PartialEq)]
#[non_exhaustive]
pub struct ConnectionCheckoutFailedEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
pub reason: ConnectionCheckoutFailedReason,
#[cfg(feature = "tracing-unstable")]
#[serde(skip)]
#[derivative(PartialEq = "ignore")]
pub(crate) error: Option<crate::error::Error>,
#[serde(default = "Duration::default")]
pub duration: Duration,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub enum ConnectionCheckoutFailedReason {
Timeout,
ConnectionError,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionCheckedOutEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
#[serde(default = "default_connection_id")]
pub connection_id: u32,
#[serde(default = "Duration::default")]
pub duration: Duration,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ConnectionCheckedInEvent {
#[serde(default = "self::empty_address")]
#[serde(skip_deserializing)]
pub address: ServerAddress,
#[serde(default = "default_connection_id")]
pub connection_id: u32,
}
fn default_connection_id() -> u32 {
42
}
pub trait CmapEventHandler: Send + Sync {
fn handle_pool_created_event(&self, _event: PoolCreatedEvent) {}
fn handle_pool_ready_event(&self, _event: PoolReadyEvent) {}
fn handle_pool_cleared_event(&self, _event: PoolClearedEvent) {}
fn handle_pool_closed_event(&self, _event: PoolClosedEvent) {}
fn handle_connection_created_event(&self, _event: ConnectionCreatedEvent) {}
fn handle_connection_ready_event(&self, _event: ConnectionReadyEvent) {}
fn handle_connection_closed_event(&self, _event: ConnectionClosedEvent) {}
fn handle_connection_checkout_started_event(&self, _event: ConnectionCheckoutStartedEvent) {}
fn handle_connection_checkout_failed_event(&self, _event: ConnectionCheckoutFailedEvent) {}
fn handle_connection_checked_out_event(&self, _event: ConnectionCheckedOutEvent) {}
fn handle_connection_checked_in_event(&self, _event: ConnectionCheckedInEvent) {}
}
#[derive(Clone, Debug, PartialEq, From)]
pub(crate) enum CmapEvent {
PoolCreated(PoolCreatedEvent),
PoolReady(PoolReadyEvent),
PoolCleared(PoolClearedEvent),
PoolClosed(PoolClosedEvent),
ConnectionCreated(ConnectionCreatedEvent),
ConnectionReady(ConnectionReadyEvent),
ConnectionClosed(ConnectionClosedEvent),
ConnectionCheckoutStarted(ConnectionCheckoutStartedEvent),
ConnectionCheckoutFailed(ConnectionCheckoutFailedEvent),
ConnectionCheckedOut(ConnectionCheckedOutEvent),
ConnectionCheckedIn(ConnectionCheckedInEvent),
}
#[derive(Clone)]
pub(crate) struct CmapEventEmitter {
user_handler: Option<Arc<dyn CmapEventHandler>>,
#[cfg(feature = "tracing-unstable")]
tracing_emitter: ConnectionTracingEventEmitter,
}
impl CmapEventEmitter {
#[allow(unused_variables)]
pub(crate) fn new(
user_handler: Option<Arc<dyn CmapEventHandler>>,
topology_id: ObjectId,
) -> CmapEventEmitter {
Self {
user_handler,
#[cfg(feature = "tracing-unstable")]
tracing_emitter: ConnectionTracingEventEmitter::new(topology_id),
}
}
#[cfg(not(feature = "tracing-unstable"))]
pub(crate) fn emit_event(&self, generate_event: impl FnOnce() -> CmapEvent) {
if let Some(ref handler) = self.user_handler {
handle_cmap_event(handler.as_ref(), generate_event());
}
}
#[cfg(feature = "tracing-unstable")]
pub(crate) fn emit_event(&self, generate_event: impl FnOnce() -> CmapEvent) {
let tracing_emitter_to_use = if trace_or_log_enabled!(
target: CONNECTION_TRACING_EVENT_TARGET,
TracingOrLogLevel::Debug
) {
Some(&self.tracing_emitter)
} else {
None
};
match (&self.user_handler, tracing_emitter_to_use) {
(None, None) => {}
(None, Some(tracing_emitter)) => {
let event = generate_event();
handle_cmap_event(tracing_emitter, event);
}
(Some(user_handler), None) => {
let event = generate_event();
handle_cmap_event(user_handler.as_ref(), event);
}
(Some(user_handler), Some(tracing_emitter)) => {
let event = generate_event();
handle_cmap_event(user_handler.as_ref(), event.clone());
handle_cmap_event(tracing_emitter, event);
}
};
}
}
fn handle_cmap_event(handler: &dyn CmapEventHandler, event: CmapEvent) {
match event {
CmapEvent::PoolCreated(event) => handler.handle_pool_created_event(event),
CmapEvent::PoolReady(event) => handler.handle_pool_ready_event(event),
CmapEvent::PoolCleared(event) => handler.handle_pool_cleared_event(event),
CmapEvent::PoolClosed(event) => handler.handle_pool_closed_event(event),
CmapEvent::ConnectionCreated(event) => handler.handle_connection_created_event(event),
CmapEvent::ConnectionReady(event) => handler.handle_connection_ready_event(event),
CmapEvent::ConnectionClosed(event) => handler.handle_connection_closed_event(event),
CmapEvent::ConnectionCheckoutStarted(event) => {
handler.handle_connection_checkout_started_event(event)
}
CmapEvent::ConnectionCheckoutFailed(event) => {
handler.handle_connection_checkout_failed_event(event)
}
CmapEvent::ConnectionCheckedOut(event) => {
handler.handle_connection_checked_out_event(event)
}
CmapEvent::ConnectionCheckedIn(event) => handler.handle_connection_checked_in_event(event),
}
}