1use crate::threadpool::{Block, ThreadPool};
2use futures::prelude::*;
3
4pub struct AndThenBlock<Fut, F, T, E> {
5 inner: Fut,
6 block: Option<Block<T, E>>,
7 func: Option<F>,
8 pool: ThreadPool,
9}
10
11pub struct OrElseBlock<Fut, F, T, E> {
12 inner: Fut,
13 block: Option<Block<T, E>>,
14 func: Option<F>,
15 pool: ThreadPool,
16}
17
18pub struct ThenBlock<Fut, F, T, E> {
19 inner: Fut,
20 block: Option<Block<T, E>>,
21 func: Option<F>,
22 pool: ThreadPool,
23}
24
25pub trait FutureBlock: Future + Sized {
26 fn and_then_block<F, T>(self, h: ThreadPool, f: F) -> AndThenBlock<Self, F, T, Self::Error>
27 where
28 F: FnOnce(Self::Item) -> Result<T, Self::Error>,
29 T: Send + 'static,
30 {
31 AndThenBlock {
32 inner: self,
33 block: None,
34 func: Some(f),
35 pool: h,
36 }
37 }
38
39 fn or_else_block<F, E>(self, h: ThreadPool, f: F) -> OrElseBlock<Self, F, Self::Item, E> {
40 OrElseBlock {
41 inner: self,
42 block: None,
43 func: Some(f),
44 pool: h,
45 }
46 }
47
48 fn then_block<F, T, E>(self, h: ThreadPool, f: F) -> ThenBlock<Self, F, T, E>
49 where
50 F: FnOnce(Result<Self::Item, Self::Error>) -> Result<T, E>,
51 T: Send + 'static,
52 {
53 ThenBlock {
54 inner: self,
55 block: None,
56 func: Some(f),
57 pool: h,
58 }
59 }
60}
61
62impl<Fut, F, T> Future for AndThenBlock<Fut, F, T, Fut::Error>
63where
64 Fut: Future,
65 Fut::Item: Send + 'static,
66 Fut::Error: Send + 'static,
67 F: FnOnce(Fut::Item) -> Result<T, Fut::Error> + Send + 'static,
68 T: Send + 'static,
69{
70 type Item = T;
71 type Error = Fut::Error;
72
73 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
74 if let Some(block) = self.block.as_mut() {
75 return block.poll();
76 }
77
78 match self.inner.poll() {
79 Ok(Async::Ready(item)) => {
80 self.block = Some(
81 self.pool
82 .block(item, self.func.take().expect("polled twice")),
83 );
84
85 self.poll()
86 }
87 Ok(Async::NotReady) => Ok(Async::NotReady),
88 Err(e) => Err(e),
89 }
90 }
91}
92
93impl<Fut, F, E> Future for OrElseBlock<Fut, F, Fut::Item, E>
94where
95 Fut: Future,
96 Fut::Item: Send + 'static,
97 Fut::Error: Send + 'static,
98 F: FnOnce(Fut::Error) -> Result<Fut::Item, E> + Send + 'static,
99 E: Send + 'static,
100{
101 type Item = Fut::Item;
102 type Error = E;
103
104 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
105 if let Some(block) = self.block.as_mut() {
106 return block.poll();
107 }
108
109 match self.inner.poll() {
110 Ok(Async::Ready(item)) => Ok(Async::Ready(item)),
111 Ok(Async::NotReady) => Ok(Async::NotReady),
112 Err(e) => {
113 self.block = Some(self.pool.block(e, self.func.take().expect("polled twice")));
114
115 self.poll()
116 }
117 }
118 }
119}
120
121impl<Fut, F, T, E> Future for ThenBlock<Fut, F, T, E>
122where
123 Fut: Future,
124 Fut::Item: Send + 'static,
125 Fut::Error: Send + 'static,
126 F: FnOnce(Result<Fut::Item, Fut::Error>) -> Result<T, E> + Send + 'static,
127 T: Send + 'static,
128 E: Send + 'static,
129{
130 type Item = T;
131 type Error = E;
132
133 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
134 if let Some(block) = self.block.as_mut() {
135 return block.poll();
136 }
137
138 let result = match self.inner.poll() {
139 Ok(Async::Ready(item)) => Ok(item),
140 Ok(Async::NotReady) => return Ok(Async::NotReady),
141 Err(e) => Err(e),
142 };
143
144 self.block = Some(
145 self.pool
146 .block(result, self.func.take().expect("polled twice")),
147 );
148
149 self.poll()
150 }
151}
152
153impl<T> FutureBlock for T where T: Future {}