1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#![deny(missing_docs)]
macro_rules! implement {
(
$doc:literal,
$sender:ty,
$receiver:ty,
($($new_args:ident: $new_args_ty:ty),*),
$make_chan:expr,
) => {
#[doc = $doc]
pub struct LazyQueue<Item> {
sender: $sender,
}
impl<Item> Clone for LazyQueue<Item> {
fn clone(&self) -> Self {
LazyQueue {
sender: self.sender.clone(),
}
}
}
#[must_use = "futures do nothing unless polled"]
pub struct QueueProcessor<Item, P, I>
where
I: ::std::future::Future,
{
inner: crate::inner::StreamProcessor<$receiver, P, I>,
}
impl<Item> LazyQueue<Item> {
pub fn new<F, I>(processor: F, $($new_args: $new_args_ty),*) -> (Self, QueueProcessor<Item, F, I>)
where
F: FnMut(Item) -> I,
I: ::std::future::Future,
{
let (sender, receiver) = $make_chan;
(
LazyQueue { sender },
QueueProcessor {
inner: StreamProcessor::new(receiver, processor),
},
)
}
}
impl<Item, P, I> ::std::future::Future for QueueProcessor<Item, P, I>
where
P: FnMut(Item) -> I,
I: ::std::future::Future,
{
type Output = ();
fn poll(self: ::std::pin::Pin<&mut Self>, ctx: &mut std::task::Context<'_>)
-> ::std::task::Poll<Self::Output> {
let inner = unsafe { self.map_unchecked_mut(|this| &mut this.inner) };
inner.poll(ctx)
}
}
};
}
mod inner;
mod sync;
pub use crate::sync::{bounded, unbounded};