1use std::fmt::Debug;
2use std::hash::Hash;
3use std::marker::Unpin;
4
5use futures::channel::mpsc;
6
7use super::{assert_future, default, Executor, Spawner, TaskType};
8
9impl<T: ?Sized> SpawnExt for T where T: futures::Future {}
10
11pub trait SpawnExt: futures::Future {
12 #[inline]
13 fn spawn<Tx, G>(self, exec: &Executor<Tx, G>) -> Spawner<Self, Tx, G, ()>
14 where
15 Self: Sized + Send + 'static,
16 Self::Output: Send + 'static,
17 Tx: Clone + Unpin + futures::Sink<((), TaskType)> + Send + Sync + 'static,
18 G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
19 {
20 let f = Spawner::new(exec, self, ());
21 assert_future::<_, _>(f)
22 }
23
24 #[inline]
25 fn spawn_with<Tx, G, D>(self, exec: &Executor<Tx, G, D>, name: D) -> Spawner<Self, Tx, G, D>
26 where
27 Self: Sized + Send + 'static,
28 Self::Output: Send + 'static,
29 Tx: Clone + Unpin + futures::Sink<(D, TaskType)> + Send + Sync + 'static,
30 G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
31 {
32 let f = Spawner::new(exec, self, name);
33 assert_future::<_, _>(f)
34 }
35}
36
37impl<T: ?Sized> SpawnDefaultExt for T where T: futures::Future {}
38
39pub trait SpawnDefaultExt: futures::Future {
40 #[inline]
41 fn spawn(self) -> Spawner<'static, Self, mpsc::Sender<((), TaskType)>, (), ()>
42 where
43 Self: Sized + Send + 'static,
44 Self::Output: Send + 'static,
45 {
46 let f = Spawner::new(default(), self, ());
47 assert_future::<_, _>(f)
48 }
49}
50
51pub struct Builder {
52 workers: usize,
53 queue_max: usize,
54}
55
56impl Default for Builder {
57 fn default() -> Self {
58 Self {
59 workers: 100,
60 queue_max: 100_000,
61 }
62 }
63}
64
65impl Builder {
66 #[inline]
67 pub fn workers(mut self, workers: usize) -> Self {
68 self.workers = workers;
69 self
70 }
71
72 #[inline]
73 pub fn queue_max(mut self, queue_max: usize) -> Self {
74 self.queue_max = queue_max;
75 self
76 }
77
78 #[inline]
79 pub fn group(self) -> GroupBuilder {
80 GroupBuilder { builder: self }
81 }
82
83 #[inline]
84 pub fn with_channel<Tx, Rx, D>(self, tx: Tx, rx: Rx) -> ChannelBuilder<Tx, Rx, D>
85 where
86 Tx: Clone + futures::Sink<(D, TaskType)> + Unpin + Send + Sync + 'static,
87 Rx: futures::Stream<Item=(D, TaskType)> + Unpin,
88 {
89 ChannelBuilder {
90 builder: self,
91 tx,
92 rx,
93 _d: std::marker::PhantomData,
94 }
95 }
96
97 #[inline]
98 pub fn build(self) -> (Executor, impl futures::Future<Output=()>) {
99 let (tx, rx) = mpsc::channel(self.queue_max);
100 Executor::with_channel(self.workers, self.queue_max, tx, rx)
101 }
102}
103
104pub struct GroupBuilder {
105 builder: Builder,
106}
107
108impl GroupBuilder {
109 #[inline]
110 pub fn build<G>(
111 self,
112 ) -> (
113 Executor<mpsc::Sender<((), TaskType)>, G>,
114 impl futures::Future<Output=()>,
115 )
116 where
117 G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
118 {
119 let (tx, rx) = mpsc::channel(self.builder.queue_max);
120 Executor::with_channel(self.builder.workers, self.builder.queue_max, tx, rx)
121 }
122}
123
124pub struct ChannelBuilder<Tx, Rx, D> {
125 builder: Builder,
126 tx: Tx,
127 rx: Rx,
128 _d: std::marker::PhantomData<D>,
129}
130
131impl<Tx, Rx, D> ChannelBuilder<Tx, Rx, D>
132 where
133 Tx: Clone + futures::Sink<(D, TaskType)> + Unpin + Send + Sync + 'static,
134 Rx: futures::Stream<Item=(D, TaskType)> + Unpin,
135{
136 #[inline]
137 pub fn build(self) -> (Executor<Tx, (), D>, impl futures::Future<Output=()>) {
138 Executor::with_channel(
139 self.builder.workers,
140 self.builder.queue_max,
141 self.tx,
142 self.rx,
143 )
144 }
145
146 #[inline]
147 pub fn group(self) -> GroupChannelBuilder<Tx, Rx, D> {
148 GroupChannelBuilder { builder: self }
149 }
150}
151
152pub struct GroupChannelBuilder<Tx, Rx, D> {
153 builder: ChannelBuilder<Tx, Rx, D>,
154}
155
156impl<Tx, Rx, D> GroupChannelBuilder<Tx, Rx, D>
157 where
158 Tx: Clone + futures::Sink<((), TaskType)> + Unpin + Send + Sync + 'static,
159 Rx: futures::Stream<Item=((), TaskType)> + Unpin,
160{
161 #[inline]
162 pub fn build<G>(self) -> (Executor<Tx, G>, impl futures::Future<Output=()>)
163 where
164 G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
165 {
166 Executor::with_channel(
167 self.builder.builder.workers,
168 self.builder.builder.queue_max,
169 self.builder.tx,
170 self.builder.rx,
171 )
172 }
173}