tokio_blocking/
combinators.rs

1use crate::threadpool::{Block, ThreadPool};
2use futures::prelude::*;
3
4pub struct AndThenBlock<Fut, F, T, E> {
5    inner: Fut,
6    block: Option<Block<T, E>>,
7    func: Option<F>,
8    pool: ThreadPool,
9}
10
11pub struct OrElseBlock<Fut, F, T, E> {
12    inner: Fut,
13    block: Option<Block<T, E>>,
14    func: Option<F>,
15    pool: ThreadPool,
16}
17
18pub struct ThenBlock<Fut, F, T, E> {
19    inner: Fut,
20    block: Option<Block<T, E>>,
21    func: Option<F>,
22    pool: ThreadPool,
23}
24
25pub trait FutureBlock: Future + Sized {
26    fn and_then_block<F, T>(self, h: ThreadPool, f: F) -> AndThenBlock<Self, F, T, Self::Error>
27    where
28        F: FnOnce(Self::Item) -> Result<T, Self::Error>,
29        T: Send + 'static,
30    {
31        AndThenBlock {
32            inner: self,
33            block: None,
34            func: Some(f),
35            pool: h,
36        }
37    }
38
39    fn or_else_block<F, E>(self, h: ThreadPool, f: F) -> OrElseBlock<Self, F, Self::Item, E> {
40        OrElseBlock {
41            inner: self,
42            block: None,
43            func: Some(f),
44            pool: h,
45        }
46    }
47
48    fn then_block<F, T, E>(self, h: ThreadPool, f: F) -> ThenBlock<Self, F, T, E>
49    where
50        F: FnOnce(Result<Self::Item, Self::Error>) -> Result<T, E>,
51        T: Send + 'static,
52    {
53        ThenBlock {
54            inner: self,
55            block: None,
56            func: Some(f),
57            pool: h,
58        }
59    }
60}
61
62impl<Fut, F, T> Future for AndThenBlock<Fut, F, T, Fut::Error>
63where
64    Fut: Future,
65    Fut::Item: Send + 'static,
66    Fut::Error: Send + 'static,
67    F: FnOnce(Fut::Item) -> Result<T, Fut::Error> + Send + 'static,
68    T: Send + 'static,
69{
70    type Item = T;
71    type Error = Fut::Error;
72
73    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
74        if let Some(block) = self.block.as_mut() {
75            return block.poll();
76        }
77
78        match self.inner.poll() {
79            Ok(Async::Ready(item)) => {
80                self.block = Some(
81                    self.pool
82                        .block(item, self.func.take().expect("polled twice")),
83                );
84
85                self.poll()
86            }
87            Ok(Async::NotReady) => Ok(Async::NotReady),
88            Err(e) => Err(e),
89        }
90    }
91}
92
93impl<Fut, F, E> Future for OrElseBlock<Fut, F, Fut::Item, E>
94where
95    Fut: Future,
96    Fut::Item: Send + 'static,
97    Fut::Error: Send + 'static,
98    F: FnOnce(Fut::Error) -> Result<Fut::Item, E> + Send + 'static,
99    E: Send + 'static,
100{
101    type Item = Fut::Item;
102    type Error = E;
103
104    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
105        if let Some(block) = self.block.as_mut() {
106            return block.poll();
107        }
108
109        match self.inner.poll() {
110            Ok(Async::Ready(item)) => Ok(Async::Ready(item)),
111            Ok(Async::NotReady) => Ok(Async::NotReady),
112            Err(e) => {
113                self.block = Some(self.pool.block(e, self.func.take().expect("polled twice")));
114
115                self.poll()
116            }
117        }
118    }
119}
120
121impl<Fut, F, T, E> Future for ThenBlock<Fut, F, T, E>
122where
123    Fut: Future,
124    Fut::Item: Send + 'static,
125    Fut::Error: Send + 'static,
126    F: FnOnce(Result<Fut::Item, Fut::Error>) -> Result<T, E> + Send + 'static,
127    T: Send + 'static,
128    E: Send + 'static,
129{
130    type Item = T;
131    type Error = E;
132
133    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
134        if let Some(block) = self.block.as_mut() {
135            return block.poll();
136        }
137
138        let result = match self.inner.poll() {
139            Ok(Async::Ready(item)) => Ok(item),
140            Ok(Async::NotReady) => return Ok(Async::NotReady),
141            Err(e) => Err(e),
142        };
143
144        self.block = Some(
145            self.pool
146                .block(result, self.func.take().expect("polled twice")),
147        );
148
149        self.poll()
150    }
151}
152
153impl<T> FutureBlock for T where T: Future {}