1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use std::sync::Arc;
use async_trait::async_trait;
use futures::{Future, FutureExt, sink::{Sink}, stream::{Stream, StreamExt}};
use super::{Running, Context};
#[async_trait]
pub trait Broker: Sized {
type Item: Send + 'static;
type WriterItem: Send + 'static;
type Ok: Send;
type Error: std::error::Error + Send;
async fn op<W>(
&mut self,
ctx: &Arc<Context<Self::Item>>,
item: Self::Item,
writer: W,
) -> Running<Result<Self::Ok, Self::Error>, Option<Self::Error>>
where W: Sink<Self::WriterItem, Error = flume::SendError<Self::WriterItem>> + Send + Unpin;
async fn handle_result(res: Result<Self::Ok, Self::Error>) -> Running<(), Option<Self::Error>> {
if let Err(_err) = res {
#[cfg(feature = "debug")]
log::error!("{:?}", _err);
}
Running::Continue(())
}
async fn broker_loop<S, W, RH, WH>(
mut self,
mut items: S,
mut writer: W,
ctx: Arc<Context<Self::Item>>,
stop: flume::Receiver<()>,
reader_handle: RH,
writer_handle: WH
) -> Result<(), Self::Error>
where
S: Stream<Item = Self::Item> + Send + Unpin,
W: Sink<Self::WriterItem, Error = flume::SendError<Self::WriterItem>> + Send + Unpin,
RH: Future + Send,
WH: Future + Send,
{
let this = &mut self;
let f = Self::handle_result;
loop {
futures::select! {
_ = stop.recv_async() => {
break;
},
item = items.next().fuse() => {
if let Some(item) = item {
match this.op(&ctx, item, &mut writer).await {
Running::Continue(res) => {
match f(res).await {
Running::Continue(_) => { },
Running::Stop(e) => {
match e {
None => return Ok(()),
Some(err) => return Err(err),
}
}
}
},
Running::Stop(e) => {
match e {
None => return Ok(()),
Some(err) => return Err(err)
}
}
}
}
}
}
}
#[cfg(feature = "debug")]
log::debug!("Dropping writer");
drop(writer);
#[cfg(feature = "debug")]
log::debug!(".await writer handle");
let _ = writer_handle.await;
if !ctx.reader_stop.is_disconnected() {
if ctx.reader_stop.send_async(()).await.is_ok() {
let _ = reader_handle.await;
}
}
#[cfg(feature = "debug")]
log::debug!("Exiting broker loop");
Ok(())
}
}