async_event_streams/
pipes.rs

1use async_std::{stream::StreamExt, sync::RwLock};
2use async_trait::async_trait;
3use futures::{
4    future::RemoteHandle,
5    task::{Spawn, SpawnError, SpawnExt},
6};
7use std::{borrow::Cow, sync::Arc};
8
9use crate::{EventBox, EventStream};
10
11///
12/// Standartized interface for structures providing sream of events of specified type.
13///
14/// Typically this trait is implemented like this:
15///
16/// ```
17/// # use async_event_streams::{EventStream, EventStreams, EventSource};
18/// struct NumericSource {
19///     stream: EventStreams<usize>
20/// }
21///
22/// impl EventSource<usize> for NumericSource {
23///   fn event_stream(&self) -> EventStream<usize> {
24///     self.stream.create_event_stream()
25///   }
26/// }
27/// ```
28pub trait EventSource<EVT: Send + Sync + 'static> {
29    fn event_stream(&self) -> EventStream<EVT>;
30}
31
32impl<EVT: Send + Sync + 'static, T: EventSource<EVT>> EventSource<EVT> for Arc<T> {
33    fn event_stream(&self) -> EventStream<EVT> {
34        (**self).event_stream()
35    }
36}
37
38/// Standartized interface for object reacting to events of specific type. The trait have two methods: ```on_event_owned``` which accepts
39/// event object and ```on_event_ref```, accepting borrowed reference to event. It's supposed that both bethods should work identically.
40/// But sometimes if it is necessary to retranslate the event received. So it is effective to handle owned event case separately from borrowed.
41///
42/// See also [crate::EventSinkExt] trait which allows to implement only one event handler by using [std::borrow::Cow]
43#[async_trait]
44pub trait EventSink<EVT: Send + Sync + 'static>: Send + Sync {
45    type Error;
46    async fn on_event_owned(
47        &self,
48        event: EVT,
49        source: Option<Arc<EventBox>>,
50    ) -> Result<(), Self::Error>;
51    async fn on_event_ref(
52        &self,
53        event: &EVT,
54        source: Option<Arc<EventBox>>,
55    ) -> Result<(), Self::Error>;
56}
57
58#[async_trait]
59impl<EVT: Send + Sync + 'static, T: EventSink<EVT> + Send + Sync> EventSink<EVT> for Arc<T> {
60    type Error = T::Error;
61    async fn on_event_owned(
62        &self,
63        event: EVT,
64        source: Option<Arc<EventBox>>,
65    ) -> Result<(), Self::Error> {
66        (**self).on_event_owned(event, source).await
67    }
68    async fn on_event_ref(
69        &self,
70        event: &EVT,
71        source: Option<Arc<EventBox>>,
72    ) -> Result<(), Self::Error> {
73        (**self).on_event_ref(event, source).await
74    }
75}
76
77#[async_trait]
78impl<EVT: Send + Sync + 'static, T: EventSink<EVT> + Send + Sync> EventSink<EVT> for RwLock<T> {
79    type Error = T::Error;
80    async fn on_event_owned(
81        &self,
82        event: EVT,
83        source: Option<Arc<EventBox>>,
84    ) -> Result<(), Self::Error> {
85        self.read().await.on_event_owned(event, source).await
86    }
87    async fn on_event_ref(
88        &self,
89        event: &EVT,
90        source: Option<Arc<EventBox>>,
91    ) -> Result<(), Self::Error> {
92        self.read().await.on_event_ref(event, source).await
93    }
94}
95
96/// Connect [EventSource] to [EventSink]: run asynchronous task which reads events from source and
97/// calls [EventSink::on_event_ref] on sink object. Source may provide events for multiple readers, so
98/// only references to events are available from it.
99pub fn spawn_event_pipe<
100    EVT: Send + Sync + Unpin + 'static,
101    E,
102    SPAWNER: Spawn,
103    SOURCE: EventSource<EVT>,
104    SINK: EventSink<EVT, Error = E> + Send + Sync + 'static,
105>(
106    spawner: &SPAWNER,
107    source: &SOURCE,
108    sink: SINK,
109    error_handler: impl FnOnce(E) + Send + 'static,
110) -> Result<(), SpawnError> {
111    let mut source = source.event_stream();
112    let process_events = async move {
113        while let Some(event) = source.next().await {
114            let eventref = event.clone();
115            let eventref = &*eventref;
116            sink.on_event_ref(eventref, event.into()).await?;
117        }
118        Result::<(), E>::Ok(())
119    };
120    spawner.spawn(async move {
121        if let Err(e) = process_events.await {
122            error_handler(e)
123        }
124    })
125}
126
127/// Same as [spawn_event_pipe], but also returns handle to task spawned by [futures::task::SpawnExt::spawn_with_handle]
128pub fn spawn_event_pipe_with_handle<
129    EVT: Send + Sync + Unpin + 'static,
130    E,
131    SPAWNER: Spawn,
132    SOURCE: EventSource<EVT> + 'static,
133    SINK: EventSink<EVT, Error = E> + Send + Sync + 'static,
134>(
135    spawner: &SPAWNER,
136    source: &SOURCE,
137    sink: SINK,
138    error_handler: impl FnOnce(E) + Send + 'static,
139) -> Result<RemoteHandle<()>, SpawnError> {
140    let mut source = source.event_stream();
141    let process_events = async move {
142        while let Some(event) = source.next().await {
143            let eventref = event.clone();
144            let eventref = &*eventref;
145            sink.on_event_ref(eventref, event.into()).await?;
146        }
147        Result::<(), E>::Ok(())
148    };
149    spawner.spawn_with_handle(async move {
150        if let Err(e) = process_events.await {
151            error_handler(e)
152        }
153    })
154}
155/// If the event object implements [ToOwned] trait (note that all [Clone] object implements it), [EventSink] implementation
156/// can be simplified by implementing helper [EventSinkExt] with only one event handler accepting [std::borrow::Cow] parameter,
157/// instead of separate handlers for owned and borrowed cases
158///
159/// If structure implements ```EventSinkExt``` the [EventSink] trait can be derived for it:
160/// ```
161/// use std::sync::Arc;
162/// use async_trait::async_trait;
163/// use std::borrow::Cow;
164/// use async_event_streams::{EventBox, EventSink, EventSinkExt};
165/// use async_event_streams_derive::EventSink;
166///
167/// #[derive(EventSink)]
168/// #[event_sink(event=Event)]
169/// struct Sink;
170///
171/// #[derive(Clone)]
172/// struct Event;
173///
174/// #[async_trait]
175/// impl EventSinkExt<Event> for Sink {
176///     type Error = ();
177///     async fn on_event<'a>(&'a self, event: Cow<'a, Event>, source: Option<Arc<EventBox>>) -> Result<(), Self::Error> {
178///         todo!()
179///     }
180///}
181/// ```
182///
183#[async_trait]
184pub trait EventSinkExt<EVT: Send + Sync + 'static + ToOwned<Owned = EVT>> {
185    type Error;
186    async fn on_event<'a>(
187        &'a self,
188        event: Cow<'a, EVT>,
189        source: Option<Arc<EventBox>>,
190    ) -> Result<(), Self::Error>;
191}