pub mod cmap;
pub mod command;
pub mod sdam;
use std::sync::Arc;
use futures_core::future::BoxFuture;
use crate::event::command::CommandEvent;
use self::{cmap::CmapEvent, sdam::SdamEvent};
#[derive(Clone)]
#[non_exhaustive]
pub enum EventHandler<T> {
Callback(Arc<dyn Fn(T) + Sync + Send>),
AsyncCallback(Arc<dyn Fn(T) -> BoxFuture<'static, ()> + Sync + Send>),
TokioMpsc(tokio::sync::mpsc::Sender<T>),
}
impl<T> std::fmt::Debug for EventHandler<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("EventHandler").finish()
}
}
impl<T> From<tokio::sync::mpsc::Sender<T>> for EventHandler<T> {
fn from(value: tokio::sync::mpsc::Sender<T>) -> Self {
Self::TokioMpsc(value)
}
}
#[allow(deprecated)]
impl<T: crate::event::command::CommandEventHandler + 'static> From<Arc<T>>
for EventHandler<CommandEvent>
{
fn from(value: Arc<T>) -> Self {
Self::callback(move |ev| match ev {
CommandEvent::Started(e) => value.handle_command_started_event(e),
CommandEvent::Succeeded(e) => value.handle_command_succeeded_event(e),
CommandEvent::Failed(e) => value.handle_command_failed_event(e),
})
}
}
#[allow(deprecated)]
impl<T: crate::event::cmap::CmapEventHandler + 'static> From<Arc<T>> for EventHandler<CmapEvent> {
fn from(value: Arc<T>) -> Self {
use CmapEvent::*;
Self::callback(move |ev| match ev {
PoolCreated(ev) => value.handle_pool_created_event(ev),
PoolReady(ev) => value.handle_pool_ready_event(ev),
PoolCleared(ev) => value.handle_pool_cleared_event(ev),
PoolClosed(ev) => value.handle_pool_closed_event(ev),
ConnectionCreated(ev) => value.handle_connection_created_event(ev),
ConnectionReady(ev) => value.handle_connection_ready_event(ev),
ConnectionClosed(ev) => value.handle_connection_closed_event(ev),
ConnectionCheckoutStarted(ev) => value.handle_connection_checkout_started_event(ev),
ConnectionCheckoutFailed(ev) => value.handle_connection_checkout_failed_event(ev),
ConnectionCheckedOut(ev) => value.handle_connection_checked_out_event(ev),
ConnectionCheckedIn(ev) => value.handle_connection_checked_in_event(ev),
})
}
}
#[allow(deprecated)]
impl<T: crate::event::sdam::SdamEventHandler + 'static> From<Arc<T>> for EventHandler<SdamEvent> {
fn from(value: Arc<T>) -> Self {
use SdamEvent::*;
Self::callback(move |ev| match ev {
ServerDescriptionChanged(ev) => value.handle_server_description_changed_event(*ev),
ServerOpening(ev) => value.handle_server_opening_event(ev),
ServerClosed(ev) => value.handle_server_closed_event(ev),
TopologyDescriptionChanged(ev) => value.handle_topology_description_changed_event(*ev),
TopologyOpening(ev) => value.handle_topology_opening_event(ev),
TopologyClosed(ev) => value.handle_topology_closed_event(ev),
ServerHeartbeatStarted(ev) => value.handle_server_heartbeat_started_event(ev),
ServerHeartbeatSucceeded(ev) => value.handle_server_heartbeat_succeeded_event(ev),
ServerHeartbeatFailed(ev) => value.handle_server_heartbeat_failed_event(ev),
})
}
}
impl<T: Send + Sync + 'static> EventHandler<T> {
pub fn callback(f: impl Fn(T) + Send + Sync + 'static) -> Self {
Self::Callback(Arc::new(f))
}
pub fn async_callback(f: impl Fn(T) -> BoxFuture<'static, ()> + Send + Sync + 'static) -> Self {
Self::AsyncCallback(Arc::new(f))
}
pub(crate) fn handle(&self, event: T) {
match self {
Self::Callback(cb) => (cb)(event),
Self::AsyncCallback(cb) => {
crate::runtime::spawn((cb)(event));
}
Self::TokioMpsc(sender) => {
let sender = sender.clone();
crate::runtime::spawn(async move {
let _ = sender.send(event).await;
});
}
}
}
}