1pub mod cmap;
4pub mod command;
5pub mod sdam;
6
7use std::sync::Arc;
8
9use futures_core::future::BoxFuture;
10
11use crate::event::command::CommandEvent;
12
13use self::{cmap::CmapEvent, sdam::SdamEvent};
14
15#[derive(Clone)]
45#[non_exhaustive]
46pub enum EventHandler<T> {
47 Callback(Arc<dyn Fn(T) + Sync + Send>),
49 AsyncCallback(Arc<dyn Fn(T) -> BoxFuture<'static, ()> + Sync + Send>),
51 TokioMpsc(tokio::sync::mpsc::Sender<T>),
53}
54
55impl<T> std::fmt::Debug for EventHandler<T> {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.debug_tuple("EventHandler").finish()
58 }
59}
60
61impl<T> From<tokio::sync::mpsc::Sender<T>> for EventHandler<T> {
62 fn from(value: tokio::sync::mpsc::Sender<T>) -> Self {
63 Self::TokioMpsc(value)
64 }
65}
66
67#[allow(deprecated)]
68impl<T: crate::event::command::CommandEventHandler + 'static> From<Arc<T>>
69 for EventHandler<CommandEvent>
70{
71 fn from(value: Arc<T>) -> Self {
72 Self::callback(move |ev| match ev {
73 CommandEvent::Started(e) => value.handle_command_started_event(e),
74 CommandEvent::Succeeded(e) => value.handle_command_succeeded_event(e),
75 CommandEvent::Failed(e) => value.handle_command_failed_event(e),
76 })
77 }
78}
79
80#[allow(deprecated)]
81impl<T: crate::event::cmap::CmapEventHandler + 'static> From<Arc<T>> for EventHandler<CmapEvent> {
82 fn from(value: Arc<T>) -> Self {
83 use CmapEvent::*;
84 Self::callback(move |ev| match ev {
85 PoolCreated(ev) => value.handle_pool_created_event(ev),
86 PoolReady(ev) => value.handle_pool_ready_event(ev),
87 PoolCleared(ev) => value.handle_pool_cleared_event(ev),
88 PoolClosed(ev) => value.handle_pool_closed_event(ev),
89 ConnectionCreated(ev) => value.handle_connection_created_event(ev),
90 ConnectionReady(ev) => value.handle_connection_ready_event(ev),
91 ConnectionClosed(ev) => value.handle_connection_closed_event(ev),
92 ConnectionCheckoutStarted(ev) => value.handle_connection_checkout_started_event(ev),
93 ConnectionCheckoutFailed(ev) => value.handle_connection_checkout_failed_event(ev),
94 ConnectionCheckedOut(ev) => value.handle_connection_checked_out_event(ev),
95 ConnectionCheckedIn(ev) => value.handle_connection_checked_in_event(ev),
96 })
97 }
98}
99
100#[allow(deprecated)]
101impl<T: crate::event::sdam::SdamEventHandler + 'static> From<Arc<T>> for EventHandler<SdamEvent> {
102 fn from(value: Arc<T>) -> Self {
103 use SdamEvent::*;
104 Self::callback(move |ev| match ev {
105 ServerDescriptionChanged(ev) => value.handle_server_description_changed_event(*ev),
106 ServerOpening(ev) => value.handle_server_opening_event(ev),
107 ServerClosed(ev) => value.handle_server_closed_event(ev),
108 TopologyDescriptionChanged(ev) => value.handle_topology_description_changed_event(*ev),
109 TopologyOpening(ev) => value.handle_topology_opening_event(ev),
110 TopologyClosed(ev) => value.handle_topology_closed_event(ev),
111 ServerHeartbeatStarted(ev) => value.handle_server_heartbeat_started_event(ev),
112 ServerHeartbeatSucceeded(ev) => value.handle_server_heartbeat_succeeded_event(ev),
113 ServerHeartbeatFailed(ev) => value.handle_server_heartbeat_failed_event(ev),
114 })
115 }
116}
117
118impl<T: Send + Sync + 'static> EventHandler<T> {
119 pub fn callback(f: impl Fn(T) + Send + Sync + 'static) -> Self {
121 Self::Callback(Arc::new(f))
122 }
123
124 pub fn async_callback(f: impl Fn(T) -> BoxFuture<'static, ()> + Send + Sync + 'static) -> Self {
126 Self::AsyncCallback(Arc::new(f))
127 }
128
129 pub(crate) fn handle(&self, event: T) {
130 match self {
131 Self::Callback(cb) => (cb)(event),
133 Self::AsyncCallback(cb) => {
134 crate::runtime::spawn((cb)(event));
135 }
136 Self::TokioMpsc(sender) => {
137 let sender = sender.clone();
138 crate::runtime::spawn(async move {
139 let _ = sender.send(event).await;
140 });
141 }
142 }
143 }
144}