async_event_streams/
pipes.rs1use async_std::{stream::StreamExt, sync::RwLock};
2use async_trait::async_trait;
3use futures::{
4 future::RemoteHandle,
5 task::{Spawn, SpawnError, SpawnExt},
6};
7use std::{borrow::Cow, sync::Arc};
8
9use crate::{EventBox, EventStream};
10
11pub trait EventSource<EVT: Send + Sync + 'static> {
29 fn event_stream(&self) -> EventStream<EVT>;
30}
31
32impl<EVT: Send + Sync + 'static, T: EventSource<EVT>> EventSource<EVT> for Arc<T> {
33 fn event_stream(&self) -> EventStream<EVT> {
34 (**self).event_stream()
35 }
36}
37
38#[async_trait]
44pub trait EventSink<EVT: Send + Sync + 'static>: Send + Sync {
45 type Error;
46 async fn on_event_owned(
47 &self,
48 event: EVT,
49 source: Option<Arc<EventBox>>,
50 ) -> Result<(), Self::Error>;
51 async fn on_event_ref(
52 &self,
53 event: &EVT,
54 source: Option<Arc<EventBox>>,
55 ) -> Result<(), Self::Error>;
56}
57
58#[async_trait]
59impl<EVT: Send + Sync + 'static, T: EventSink<EVT> + Send + Sync> EventSink<EVT> for Arc<T> {
60 type Error = T::Error;
61 async fn on_event_owned(
62 &self,
63 event: EVT,
64 source: Option<Arc<EventBox>>,
65 ) -> Result<(), Self::Error> {
66 (**self).on_event_owned(event, source).await
67 }
68 async fn on_event_ref(
69 &self,
70 event: &EVT,
71 source: Option<Arc<EventBox>>,
72 ) -> Result<(), Self::Error> {
73 (**self).on_event_ref(event, source).await
74 }
75}
76
77#[async_trait]
78impl<EVT: Send + Sync + 'static, T: EventSink<EVT> + Send + Sync> EventSink<EVT> for RwLock<T> {
79 type Error = T::Error;
80 async fn on_event_owned(
81 &self,
82 event: EVT,
83 source: Option<Arc<EventBox>>,
84 ) -> Result<(), Self::Error> {
85 self.read().await.on_event_owned(event, source).await
86 }
87 async fn on_event_ref(
88 &self,
89 event: &EVT,
90 source: Option<Arc<EventBox>>,
91 ) -> Result<(), Self::Error> {
92 self.read().await.on_event_ref(event, source).await
93 }
94}
95
96pub fn spawn_event_pipe<
100 EVT: Send + Sync + Unpin + 'static,
101 E,
102 SPAWNER: Spawn,
103 SOURCE: EventSource<EVT>,
104 SINK: EventSink<EVT, Error = E> + Send + Sync + 'static,
105>(
106 spawner: &SPAWNER,
107 source: &SOURCE,
108 sink: SINK,
109 error_handler: impl FnOnce(E) + Send + 'static,
110) -> Result<(), SpawnError> {
111 let mut source = source.event_stream();
112 let process_events = async move {
113 while let Some(event) = source.next().await {
114 let eventref = event.clone();
115 let eventref = &*eventref;
116 sink.on_event_ref(eventref, event.into()).await?;
117 }
118 Result::<(), E>::Ok(())
119 };
120 spawner.spawn(async move {
121 if let Err(e) = process_events.await {
122 error_handler(e)
123 }
124 })
125}
126
127pub fn spawn_event_pipe_with_handle<
129 EVT: Send + Sync + Unpin + 'static,
130 E,
131 SPAWNER: Spawn,
132 SOURCE: EventSource<EVT> + 'static,
133 SINK: EventSink<EVT, Error = E> + Send + Sync + 'static,
134>(
135 spawner: &SPAWNER,
136 source: &SOURCE,
137 sink: SINK,
138 error_handler: impl FnOnce(E) + Send + 'static,
139) -> Result<RemoteHandle<()>, SpawnError> {
140 let mut source = source.event_stream();
141 let process_events = async move {
142 while let Some(event) = source.next().await {
143 let eventref = event.clone();
144 let eventref = &*eventref;
145 sink.on_event_ref(eventref, event.into()).await?;
146 }
147 Result::<(), E>::Ok(())
148 };
149 spawner.spawn_with_handle(async move {
150 if let Err(e) = process_events.await {
151 error_handler(e)
152 }
153 })
154}
155#[async_trait]
184pub trait EventSinkExt<EVT: Send + Sync + 'static + ToOwned<Owned = EVT>> {
185 type Error;
186 async fn on_event<'a>(
187 &'a self,
188 event: Cow<'a, EVT>,
189 source: Option<Arc<EventBox>>,
190 ) -> Result<(), Self::Error>;
191}