1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use futures::sync::mpsc;
use tokio::prelude::*;
use super::{CancelReason, Streamer, Consumer, ConsumerEvent, Conduit};
pub type ProducerEventRx<T> = mpsc::UnboundedReceiver<ProducerEvent<T>>;
pub type ProducerEventTx<T> = mpsc::UnboundedSender<ProducerEvent<T>>;
pub type ProducerMessageRx = mpsc::UnboundedReceiver<ProducerMessage>;
pub type ProducerMessageTx = mpsc::UnboundedSender<ProducerMessage>;
#[derive(Debug)]
pub enum ProducerMessage {
Request(usize),
Cancel(CancelReason),
}
#[derive(Debug)]
pub enum ProducerEvent<T> {
Data(T),
End,
}
pub trait Producer<T> : Streamer
where T: Send + 'static
{
fn request(&mut self, num_items: usize);
fn event_stream(&mut self) -> Option<ProducerEventRx<T>>;
fn set_event_stream(&mut self, event_stream: ProducerEventRx<T>);
fn pipe_into<C>(self, consumer: C)
where Self: Sized + Send + 'static,
C: Consumer<T> + Sized + Send + 'static,
T: Send + 'static,
{
pipe_into(self, consumer);
}
fn pipe_through<C, U>(self, conduit: C) -> C::ConcreteProducer
where Self: Sized + Send + 'static,
C: Conduit<T, U> + Sized + Send + 'static,
T: Send + 'static,
U: Send + 'static,
C::ConcreteConsumer: Send,
{
let (consumer, producer) = conduit.split();
pipe_into(self, consumer);
producer
}
fn events(&mut self) -> ProducerEventEmitter<T> {
let event_rx = self.event_stream().expect("event stream");
ProducerEventEmitter::new(event_rx)
}
}
pub struct ProducerEventEmitter<T>
where T: Send + 'static
{
event_rx: Option<ProducerEventRx<T>>,
}
impl<T> ProducerEventEmitter<T>
where T: Send + 'static
{
pub fn new(event_rx: ProducerEventRx<T>) -> Self {
Self {
event_rx: Some(event_rx),
}
}
pub fn for_each<C: FnMut(ProducerEvent<T>) + Send + 'static>(&mut self, mut callback: C) {
let rx = Option::take(&mut self.event_rx).expect("take event_rx");
tokio::spawn(rx.for_each(move |event| {
callback(event);
Ok(())
}));
}
}
pub fn pipe_into<T, P, C>(mut producer: P, mut consumer: C)
where T: Send + 'static,
P: Producer<T> + Send + 'static,
C: Consumer<T> + Send + 'static
{
let consumer_events = consumer.event_stream().expect("no event stream");
producer.events().for_each(move |event| {
match event {
ProducerEvent::Data(data) => {
consumer.write(data);
},
ProducerEvent::End => {
consumer.end();
},
}
});
tokio::spawn(consumer_events.for_each(move |event| {
match event {
ConsumerEvent::Request(num_items) => {
producer.request(num_items);
},
ConsumerEvent::Cancellation(reason) => {
producer.cancel(reason);
},
}
Ok(())
})
.map_err(|e| {
println!("error {:?}", e);
}));
}