1use std::fmt::Debug;
2use std::hash::Hash;
3use std::marker::Unpin;
4
5use super::{assert_future, default, Spawner, TaskExecQueue, TaskType};
6
7impl<T: ?Sized> SpawnExt for T where T: futures::Future {}
8
9pub trait SpawnExt: futures::Future {
10 #[inline]
11 fn spawn<Tx, G>(self, queue: &TaskExecQueue<Tx, G>) -> Spawner<Self, Tx, G, ()>
12 where
13 Self: Sized + Send + 'static,
14 Self::Output: Send + 'static,
15 Tx: Clone + Unpin + futures::Sink<((), TaskType)> + Send + Sync + 'static,
16 G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
17 {
18 let f = Spawner::new(queue, self, ());
19 assert_future::<_, _>(f)
20 }
21
22 #[inline]
23 fn spawn_with<Tx, G, D>(
24 self,
25 queue: &TaskExecQueue<Tx, G, D>,
26 name: D,
27 ) -> Spawner<Self, Tx, G, D>
28 where
29 Self: Sized + Send + 'static,
30 Self::Output: Send + 'static,
31 Tx: Clone + Unpin + futures::Sink<(D, TaskType)> + Send + Sync + 'static,
32 G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
33 {
34 let f = Spawner::new(queue, self, name);
35 assert_future::<_, _>(f)
36 }
37}
38
39impl<T: ?Sized> SpawnDefaultExt for T where T: futures::Future {}
40
41pub trait SpawnDefaultExt: futures::Future {
42 #[inline]
43 fn spawn(self) -> Spawner<'static, Self, futures::channel::mpsc::Sender<((), TaskType)>, (), ()>
44 where
45 Self: Sized + Send + 'static,
46 Self::Output: Send + 'static,
47 {
48 let f = Spawner::new(default(), self, ());
49 assert_future::<_, _>(f)
50 }
51}
52
53pub struct Builder {
54 workers: usize,
55 queue_max: usize,
56}
57
58impl Default for Builder {
59 fn default() -> Self {
60 Self {
61 workers: 100,
62 queue_max: 100_000,
63 }
64 }
65}
66
67impl Builder {
68 #[inline]
69 pub fn workers(mut self, workers: usize) -> Self {
70 self.workers = workers;
71 self
72 }
73
74 #[inline]
75 pub fn queue_max(mut self, max: usize) -> Self {
76 self.queue_max = max;
77 self
78 }
79
80 #[inline]
81 pub fn group(self) -> GroupBuilder {
82 GroupBuilder { builder: self }
83 }
84
85 #[inline]
86 pub fn with_channel<Tx, Rx, D>(self, tx: Tx, rx: Rx) -> ChannelBuilder<Tx, Rx, D>
87 where
88 Tx: Clone + futures::Sink<(D, TaskType)> + Unpin + Send + Sync + 'static,
89 Rx: futures::Stream<Item = (D, TaskType)> + Unpin,
90 {
91 ChannelBuilder {
92 builder: self,
93 tx,
94 rx,
95 _d: std::marker::PhantomData,
96 }
97 }
98
99 #[inline]
100 pub fn build(self) -> (TaskExecQueue, impl futures::Future<Output = ()>) {
101 let (tx, rx) = futures::channel::mpsc::channel(self.queue_max);
102 TaskExecQueue::with_channel(self.workers, self.queue_max, tx, rx)
103 }
104}
105
106pub struct GroupBuilder {
107 builder: Builder,
108}
109
110impl GroupBuilder {
111 #[inline]
112 pub fn build<G>(
113 self,
114 ) -> (
115 TaskExecQueue<futures::channel::mpsc::Sender<((), TaskType)>, G>,
116 impl futures::Future<Output = ()>,
117 )
118 where
119 G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
120 {
121 let (tx, rx) = futures::channel::mpsc::channel(self.builder.queue_max);
122 TaskExecQueue::with_channel(self.builder.workers, self.builder.queue_max, tx, rx)
123 }
124}
125
126pub struct ChannelBuilder<Tx, Rx, D> {
127 builder: Builder,
128 tx: Tx,
129 rx: Rx,
130 _d: std::marker::PhantomData<D>,
131}
132
133impl<Tx, Rx, D> ChannelBuilder<Tx, Rx, D>
134where
135 Tx: Clone + futures::Sink<(D, TaskType)> + Unpin + Send + Sync + 'static,
136 Rx: futures::Stream<Item = (D, TaskType)> + Unpin,
137{
138 #[inline]
139 pub fn build(self) -> (TaskExecQueue<Tx, (), D>, impl futures::Future<Output = ()>) {
140 TaskExecQueue::with_channel(
141 self.builder.workers,
142 self.builder.queue_max,
143 self.tx,
144 self.rx,
145 )
146 }
147
148 #[inline]
149 pub fn group(self) -> GroupChannelBuilder<Tx, Rx, D> {
150 GroupChannelBuilder { builder: self }
151 }
152}
153
154pub struct GroupChannelBuilder<Tx, Rx, D> {
155 builder: ChannelBuilder<Tx, Rx, D>,
156}
157
158impl<Tx, Rx, D> GroupChannelBuilder<Tx, Rx, D>
159where
160 Tx: Clone + futures::Sink<((), TaskType)> + Unpin + Send + Sync + 'static,
161 Rx: futures::Stream<Item = ((), TaskType)> + Unpin,
162{
163 #[inline]
164 pub fn build<G>(self) -> (TaskExecQueue<Tx, G>, impl futures::Future<Output = ()>)
165 where
166 G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
167 {
168 TaskExecQueue::with_channel(
169 self.builder.builder.workers,
170 self.builder.builder.queue_max,
171 self.builder.tx,
172 self.builder.rx,
173 )
174 }
175}