1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
use std::sync::mpsc::TryRecvError;
use async_std::sync::Mutex;
use async_trait::async_trait;
use std::fmt::Debug;
use crate::asynchronous::basic::BasicAsyncMediator;
use super::*;
/// Context aware async mediator for asynchronous environments with events of type `Ev`.
///
/// Uses an underlying [`BasicAsyncMediator`] for base functionality
/// and a `Mutex` to store the user-defined dependency `Dep`.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// use mediator_sys::asynchronous::contextaware::*;
/// use std::sync::Arc;
/// use async_trait::async_trait;
/// use async_std;
///
/// #[derive(Debug, Clone)]
/// enum MyEvent {
/// One,
/// Two
/// }
///
/// #[derive(Debug, Default)]
/// struct MyContext(Arc<u32>);
///
/// struct Request(u32);
///
/// #[async_trait]
/// impl CxAwareAsyncRequestHandler<MyContext, Request, MyEvent> for CxAwareAsyncMediator<MyContext, MyEvent> {
/// async fn handle(&self, req: Request, dep: &MyContext) {
/// let my_context: u32 = *dep.0;
/// match req.0 {
/// 1 => self.publish(MyEvent::One).await,
/// 2 => self.publish(MyEvent::Two).await,
/// _ => ()
/// };
/// }
/// }
///
/// async_std::task::block_on(async {
/// let mediator = CxAwareAsyncMediator::<MyContext, MyEvent>::builder()
/// .add_listener(move |ev| {
/// /* Your listening logic */
/// })
/// .add_listener(move |ev| {
/// /* Your listening logic */
/// })
/// .add_dependency(MyContext::default())
/// .build()
/// .unwrap();
///
/// mediator.send(Request(1)).await;
/// mediator.next().await.ok();
/// });
///
#[cfg(feature = "async")]
#[derive(Debug)]
pub struct CxAwareAsyncMediator<Dep, Ev>
where
Dep: Debug,
Ev: Debug,
{
pub(crate) basic: BasicAsyncMediator<Ev>,
pub(crate) dep: Mutex<Dep>,
}
#[async_trait]
impl<Dep, Ev> AsyncMediatorInternal<Ev> for CxAwareAsyncMediator<Dep, Ev>
where
Dep: Debug + Send,
Ev: Debug + Send,
{
/// Publishes an event `Ev` asynchronously.
///
/// This method instructs the underlying [`BasicAsyncMediator`]
/// to publish a user-defined event.
///
/// It should be used within [`CxAwareAsyncRequestHandler::handle()`].
///
/// You need to await the `Future` using `.await`.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// use mediator_sys::asynchronous::contextaware::*;
/// use async_trait::async_trait;
/// use std::sync::Arc;
///
/// #[derive(Debug, Clone)]
/// enum MyEvent {
/// One,
/// Two
/// }
///
/// #[derive(Debug, Default)]
/// struct MyContext(Arc<u32>);
///
/// struct Request(u32);
///
/// #[async_trait]
/// impl CxAwareAsyncRequestHandler<MyContext, Request, MyEvent> for CxAwareAsyncMediator<MyContext, MyEvent> {
/// async fn handle(&self, req: Request, dep: &MyContext) {
/// let my_context: u32 = *dep.0;
/// match req.0 {
/// 1 => self.publish(MyEvent::One).await,
/// 2 => self.publish(MyEvent::Two).await,
/// _ => ()
/// };
/// }
/// }
///
async fn publish(&self, event: Ev) {
self.basic.publish(event).await
}
}
#[async_trait]
impl<Dep, Ev> CxAwareAsyncMediatorInternalHandle<Dep, Ev> for CxAwareAsyncMediator<Dep, Ev>
where
Dep: Debug + Send + Sync,
Ev: Debug + Send,
{
/// Send a request of type `Req` to the mediator asynchronously.
///
/// The request will be processed internally by [`CxAwareAsyncRequestHandler::handle()`].
/// This is why it is required to implement [`CxAwareAsyncRequestHandler`] for [`CxAwareAsyncMediator`].
/// A `Mutex` will be locked in order to gain access to the context `Dep`.
///
/// You need to await the `Future` using `.await`.
///
async fn send<Req>(&self, req: Req)
where
Self: CxAwareAsyncRequestHandler<Dep, Req, Ev>,
Req: Send,
{
let m = self.dep.lock().await;
<Self as CxAwareAsyncRequestHandler<Dep, Req, Ev>>::handle(self, req, &m).await
}
}
#[async_trait]
impl<Dep, Ev> AsyncMediatorInternalNext for CxAwareAsyncMediator<Dep, Ev>
where
Dep: Debug + Send,
Ev: Debug + Clone + Send,
{
/// Process the next published event `Ev` asynchronously.
///
/// This method instructs the underlying [`BasicAsyncMediator`]
/// to process the next event.
///
/// See [`BasicAsyncMediator::next()`] for more info.
///
/// You need to await the `Future` using `.await`.
///
async fn next(&self) -> Result<(), TryRecvError> {
self.basic.next().await
}
}