task_exec_queue/
builder.rs

1use std::fmt::Debug;
2use std::hash::Hash;
3use std::marker::Unpin;
4
5use super::{assert_future, default, Spawner, TaskExecQueue, TaskType};
6
7impl<T: ?Sized> SpawnExt for T where T: futures::Future {}
8
9pub trait SpawnExt: futures::Future {
10    #[inline]
11    fn spawn<Tx, G>(self, queue: &TaskExecQueue<Tx, G>) -> Spawner<Self, Tx, G, ()>
12    where
13        Self: Sized + Send + 'static,
14        Self::Output: Send + 'static,
15        Tx: Clone + Unpin + futures::Sink<((), TaskType)> + Send + Sync + 'static,
16        G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
17    {
18        let f = Spawner::new(queue, self, ());
19        assert_future::<_, _>(f)
20    }
21
22    #[inline]
23    fn spawn_with<Tx, G, D>(
24        self,
25        queue: &TaskExecQueue<Tx, G, D>,
26        name: D,
27    ) -> Spawner<Self, Tx, G, D>
28    where
29        Self: Sized + Send + 'static,
30        Self::Output: Send + 'static,
31        Tx: Clone + Unpin + futures::Sink<(D, TaskType)> + Send + Sync + 'static,
32        G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
33    {
34        let f = Spawner::new(queue, self, name);
35        assert_future::<_, _>(f)
36    }
37}
38
39impl<T: ?Sized> SpawnDefaultExt for T where T: futures::Future {}
40
41pub trait SpawnDefaultExt: futures::Future {
42    #[inline]
43    fn spawn(self) -> Spawner<'static, Self, futures::channel::mpsc::Sender<((), TaskType)>, (), ()>
44    where
45        Self: Sized + Send + 'static,
46        Self::Output: Send + 'static,
47    {
48        let f = Spawner::new(default(), self, ());
49        assert_future::<_, _>(f)
50    }
51}
52
53pub struct Builder {
54    workers: usize,
55    queue_max: usize,
56}
57
58impl Default for Builder {
59    fn default() -> Self {
60        Self {
61            workers: 100,
62            queue_max: 100_000,
63        }
64    }
65}
66
67impl Builder {
68    #[inline]
69    pub fn workers(mut self, workers: usize) -> Self {
70        self.workers = workers;
71        self
72    }
73
74    #[inline]
75    pub fn queue_max(mut self, max: usize) -> Self {
76        self.queue_max = max;
77        self
78    }
79
80    #[inline]
81    pub fn group(self) -> GroupBuilder {
82        GroupBuilder { builder: self }
83    }
84
85    #[inline]
86    pub fn with_channel<Tx, Rx, D>(self, tx: Tx, rx: Rx) -> ChannelBuilder<Tx, Rx, D>
87    where
88        Tx: Clone + futures::Sink<(D, TaskType)> + Unpin + Send + Sync + 'static,
89        Rx: futures::Stream<Item = (D, TaskType)> + Unpin,
90    {
91        ChannelBuilder {
92            builder: self,
93            tx,
94            rx,
95            _d: std::marker::PhantomData,
96        }
97    }
98
99    #[inline]
100    pub fn build(self) -> (TaskExecQueue, impl futures::Future<Output = ()>) {
101        let (tx, rx) = futures::channel::mpsc::channel(self.queue_max);
102        TaskExecQueue::with_channel(self.workers, self.queue_max, tx, rx)
103    }
104}
105
106pub struct GroupBuilder {
107    builder: Builder,
108}
109
110impl GroupBuilder {
111    #[inline]
112    pub fn build<G>(
113        self,
114    ) -> (
115        TaskExecQueue<futures::channel::mpsc::Sender<((), TaskType)>, G>,
116        impl futures::Future<Output = ()>,
117    )
118    where
119        G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
120    {
121        let (tx, rx) = futures::channel::mpsc::channel(self.builder.queue_max);
122        TaskExecQueue::with_channel(self.builder.workers, self.builder.queue_max, tx, rx)
123    }
124}
125
126pub struct ChannelBuilder<Tx, Rx, D> {
127    builder: Builder,
128    tx: Tx,
129    rx: Rx,
130    _d: std::marker::PhantomData<D>,
131}
132
133impl<Tx, Rx, D> ChannelBuilder<Tx, Rx, D>
134where
135    Tx: Clone + futures::Sink<(D, TaskType)> + Unpin + Send + Sync + 'static,
136    Rx: futures::Stream<Item = (D, TaskType)> + Unpin,
137{
138    #[inline]
139    pub fn build(self) -> (TaskExecQueue<Tx, (), D>, impl futures::Future<Output = ()>) {
140        TaskExecQueue::with_channel(
141            self.builder.workers,
142            self.builder.queue_max,
143            self.tx,
144            self.rx,
145        )
146    }
147
148    #[inline]
149    pub fn group(self) -> GroupChannelBuilder<Tx, Rx, D> {
150        GroupChannelBuilder { builder: self }
151    }
152}
153
154pub struct GroupChannelBuilder<Tx, Rx, D> {
155    builder: ChannelBuilder<Tx, Rx, D>,
156}
157
158impl<Tx, Rx, D> GroupChannelBuilder<Tx, Rx, D>
159where
160    Tx: Clone + futures::Sink<((), TaskType)> + Unpin + Send + Sync + 'static,
161    Rx: futures::Stream<Item = ((), TaskType)> + Unpin,
162{
163    #[inline]
164    pub fn build<G>(self) -> (TaskExecQueue<Tx, G>, impl futures::Future<Output = ()>)
165    where
166        G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
167    {
168        TaskExecQueue::with_channel(
169            self.builder.builder.workers,
170            self.builder.builder.queue_max,
171            self.builder.tx,
172            self.builder.rx,
173        )
174    }
175}