task_executor/
builder.rs

1use std::fmt::Debug;
2use std::hash::Hash;
3use std::marker::Unpin;
4
5use futures::channel::mpsc;
6
7use super::{assert_future, default, Executor, Spawner, TaskType};
8
9impl<T: ?Sized> SpawnExt for T where T: futures::Future {}
10
11pub trait SpawnExt: futures::Future {
12    #[inline]
13    fn spawn<Tx, G>(self, exec: &Executor<Tx, G>) -> Spawner<Self, Tx, G, ()>
14        where
15            Self: Sized + Send + 'static,
16            Self::Output: Send + 'static,
17            Tx: Clone + Unpin + futures::Sink<((), TaskType)> + Send + Sync + 'static,
18            G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
19    {
20        let f = Spawner::new(exec, self, ());
21        assert_future::<_, _>(f)
22    }
23
24    #[inline]
25    fn spawn_with<Tx, G, D>(self, exec: &Executor<Tx, G, D>, name: D) -> Spawner<Self, Tx, G, D>
26        where
27            Self: Sized + Send + 'static,
28            Self::Output: Send + 'static,
29            Tx: Clone + Unpin + futures::Sink<(D, TaskType)> + Send + Sync + 'static,
30            G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
31    {
32        let f = Spawner::new(exec, self, name);
33        assert_future::<_, _>(f)
34    }
35}
36
37impl<T: ?Sized> SpawnDefaultExt for T where T: futures::Future {}
38
39pub trait SpawnDefaultExt: futures::Future {
40    #[inline]
41    fn spawn(self) -> Spawner<'static, Self, mpsc::Sender<((), TaskType)>, (), ()>
42        where
43            Self: Sized + Send + 'static,
44            Self::Output: Send + 'static,
45    {
46        let f = Spawner::new(default(), self, ());
47        assert_future::<_, _>(f)
48    }
49}
50
51pub struct Builder {
52    workers: usize,
53    queue_max: usize,
54}
55
56impl Default for Builder {
57    fn default() -> Self {
58        Self {
59            workers: 100,
60            queue_max: 100_000,
61        }
62    }
63}
64
65impl Builder {
66    #[inline]
67    pub fn workers(mut self, workers: usize) -> Self {
68        self.workers = workers;
69        self
70    }
71
72    #[inline]
73    pub fn queue_max(mut self, queue_max: usize) -> Self {
74        self.queue_max = queue_max;
75        self
76    }
77
78    #[inline]
79    pub fn group(self) -> GroupBuilder {
80        GroupBuilder { builder: self }
81    }
82
83    #[inline]
84    pub fn with_channel<Tx, Rx, D>(self, tx: Tx, rx: Rx) -> ChannelBuilder<Tx, Rx, D>
85        where
86            Tx: Clone + futures::Sink<(D, TaskType)> + Unpin + Send + Sync + 'static,
87            Rx: futures::Stream<Item=(D, TaskType)> + Unpin,
88    {
89        ChannelBuilder {
90            builder: self,
91            tx,
92            rx,
93            _d: std::marker::PhantomData,
94        }
95    }
96
97    #[inline]
98    pub fn build(self) -> (Executor, impl futures::Future<Output=()>) {
99        let (tx, rx) = mpsc::channel(self.queue_max);
100        Executor::with_channel(self.workers, self.queue_max, tx, rx)
101    }
102}
103
104pub struct GroupBuilder {
105    builder: Builder,
106}
107
108impl GroupBuilder {
109    #[inline]
110    pub fn build<G>(
111        self,
112    ) -> (
113        Executor<mpsc::Sender<((), TaskType)>, G>,
114        impl futures::Future<Output=()>,
115    )
116        where
117            G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
118    {
119        let (tx, rx) = mpsc::channel(self.builder.queue_max);
120        Executor::with_channel(self.builder.workers, self.builder.queue_max, tx, rx)
121    }
122}
123
124pub struct ChannelBuilder<Tx, Rx, D> {
125    builder: Builder,
126    tx: Tx,
127    rx: Rx,
128    _d: std::marker::PhantomData<D>,
129}
130
131impl<Tx, Rx, D> ChannelBuilder<Tx, Rx, D>
132    where
133        Tx: Clone + futures::Sink<(D, TaskType)> + Unpin + Send + Sync + 'static,
134        Rx: futures::Stream<Item=(D, TaskType)> + Unpin,
135{
136    #[inline]
137    pub fn build(self) -> (Executor<Tx, (), D>, impl futures::Future<Output=()>) {
138        Executor::with_channel(
139            self.builder.workers,
140            self.builder.queue_max,
141            self.tx,
142            self.rx,
143        )
144    }
145
146    #[inline]
147    pub fn group(self) -> GroupChannelBuilder<Tx, Rx, D> {
148        GroupChannelBuilder { builder: self }
149    }
150}
151
152pub struct GroupChannelBuilder<Tx, Rx, D> {
153    builder: ChannelBuilder<Tx, Rx, D>,
154}
155
156impl<Tx, Rx, D> GroupChannelBuilder<Tx, Rx, D>
157    where
158        Tx: Clone + futures::Sink<((), TaskType)> + Unpin + Send + Sync + 'static,
159        Rx: futures::Stream<Item=((), TaskType)> + Unpin,
160{
161    #[inline]
162    pub fn build<G>(self) -> (Executor<Tx, G>, impl futures::Future<Output=()>)
163        where
164            G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
165    {
166        Executor::with_channel(
167            self.builder.builder.workers,
168            self.builder.builder.queue_max,
169            self.builder.tx,
170            self.builder.rx,
171        )
172    }
173}