Skip to main content

mongodb/
event.rs

1//! Contains the events and functionality for monitoring internal `Client` behavior.
2
3pub mod cmap;
4pub mod command;
5pub mod sdam;
6
7use std::sync::Arc;
8
9use futures_core::future::BoxFuture;
10
11use crate::event::command::CommandEvent;
12
13use self::{cmap::CmapEvent, sdam::SdamEvent};
14
15/// A destination for events.  Allows implicit conversion via [`From`] for concrete types for
16/// convenience with [`crate::options::ClientOptions`] construction:
17///
18/// ```rust
19/// # use mongodb::options::ClientOptions;
20/// # fn example() {
21/// let (tx, mut rx) = tokio::sync::mpsc::channel(100);
22/// tokio::spawn(async move {
23///     while let Some(ev) = rx.recv().await {
24///         println!("{:?}", ev);
25///     }
26/// });
27/// let options = ClientOptions::builder()
28///                 .command_event_handler(tx)
29///                 .build();
30/// # }
31/// ```
32///
33/// or explicit construction for `Fn` traits:
34///
35/// ```rust
36/// # use mongodb::options::ClientOptions;
37/// # use mongodb::event::EventHandler;
38/// # fn example() {
39/// let options = ClientOptions::builder()
40///                 .command_event_handler(EventHandler::callback(|ev| println!("{:?}", ev)))
41///                 .build();
42/// # }
43/// ```
44#[derive(Clone)]
45#[non_exhaustive]
46pub enum EventHandler<T> {
47    /// A callback.
48    Callback(Arc<dyn Fn(T) + Sync + Send>),
49    /// An async callback.
50    AsyncCallback(Arc<dyn Fn(T) -> BoxFuture<'static, ()> + Sync + Send>),
51    /// A `tokio` channel sender.
52    TokioMpsc(tokio::sync::mpsc::Sender<T>),
53}
54
55impl<T> std::fmt::Debug for EventHandler<T> {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_tuple("EventHandler").finish()
58    }
59}
60
61impl<T> From<tokio::sync::mpsc::Sender<T>> for EventHandler<T> {
62    fn from(value: tokio::sync::mpsc::Sender<T>) -> Self {
63        Self::TokioMpsc(value)
64    }
65}
66
67#[allow(deprecated)]
68impl<T: crate::event::command::CommandEventHandler + 'static> From<Arc<T>>
69    for EventHandler<CommandEvent>
70{
71    fn from(value: Arc<T>) -> Self {
72        Self::callback(move |ev| match ev {
73            CommandEvent::Started(e) => value.handle_command_started_event(e),
74            CommandEvent::Succeeded(e) => value.handle_command_succeeded_event(e),
75            CommandEvent::Failed(e) => value.handle_command_failed_event(e),
76        })
77    }
78}
79
80#[allow(deprecated)]
81impl<T: crate::event::cmap::CmapEventHandler + 'static> From<Arc<T>> for EventHandler<CmapEvent> {
82    fn from(value: Arc<T>) -> Self {
83        use CmapEvent::*;
84        Self::callback(move |ev| match ev {
85            PoolCreated(ev) => value.handle_pool_created_event(ev),
86            PoolReady(ev) => value.handle_pool_ready_event(ev),
87            PoolCleared(ev) => value.handle_pool_cleared_event(ev),
88            PoolClosed(ev) => value.handle_pool_closed_event(ev),
89            ConnectionCreated(ev) => value.handle_connection_created_event(ev),
90            ConnectionReady(ev) => value.handle_connection_ready_event(ev),
91            ConnectionClosed(ev) => value.handle_connection_closed_event(ev),
92            ConnectionCheckoutStarted(ev) => value.handle_connection_checkout_started_event(ev),
93            ConnectionCheckoutFailed(ev) => value.handle_connection_checkout_failed_event(ev),
94            ConnectionCheckedOut(ev) => value.handle_connection_checked_out_event(ev),
95            ConnectionCheckedIn(ev) => value.handle_connection_checked_in_event(ev),
96        })
97    }
98}
99
100#[allow(deprecated)]
101impl<T: crate::event::sdam::SdamEventHandler + 'static> From<Arc<T>> for EventHandler<SdamEvent> {
102    fn from(value: Arc<T>) -> Self {
103        use SdamEvent::*;
104        Self::callback(move |ev| match ev {
105            ServerDescriptionChanged(ev) => value.handle_server_description_changed_event(*ev),
106            ServerOpening(ev) => value.handle_server_opening_event(ev),
107            ServerClosed(ev) => value.handle_server_closed_event(ev),
108            TopologyDescriptionChanged(ev) => value.handle_topology_description_changed_event(*ev),
109            TopologyOpening(ev) => value.handle_topology_opening_event(ev),
110            TopologyClosed(ev) => value.handle_topology_closed_event(ev),
111            ServerHeartbeatStarted(ev) => value.handle_server_heartbeat_started_event(ev),
112            ServerHeartbeatSucceeded(ev) => value.handle_server_heartbeat_succeeded_event(ev),
113            ServerHeartbeatFailed(ev) => value.handle_server_heartbeat_failed_event(ev),
114        })
115    }
116}
117
118impl<T: Send + Sync + 'static> EventHandler<T> {
119    /// Construct a new event handler with a callback.
120    pub fn callback(f: impl Fn(T) + Send + Sync + 'static) -> Self {
121        Self::Callback(Arc::new(f))
122    }
123
124    /// Construct a new event handler with an async callback.
125    pub fn async_callback(f: impl Fn(T) -> BoxFuture<'static, ()> + Send + Sync + 'static) -> Self {
126        Self::AsyncCallback(Arc::new(f))
127    }
128
129    pub(crate) fn handle(&self, event: T) {
130        match self {
131            // TODO RUST-1731 Use tokio's spawn_blocking
132            Self::Callback(cb) => (cb)(event),
133            Self::AsyncCallback(cb) => {
134                crate::runtime::spawn((cb)(event));
135            }
136            Self::TokioMpsc(sender) => {
137                let sender = sender.clone();
138                crate::runtime::spawn(async move {
139                    let _ = sender.send(event).await;
140                });
141            }
142        }
143    }
144}