1use std::fmt::Debug;
2use std::hash::Hash;
3use std::marker::Unpin;
4
5use super::{assert_future, LocalSpawner, LocalTaskExecQueue, LocalTaskType};
6
7impl<T: ?Sized> LocalSpawnExt for T where T: futures::Future {}
8
9pub trait LocalSpawnExt: futures::Future {
10 #[inline]
11 fn spawn<Tx, G>(self, queue: &LocalTaskExecQueue<Tx, G>) -> LocalSpawner<Self, Tx, G, ()>
12 where
13 Self: Sized + 'static,
14 Self::Output: 'static,
15 Tx: Clone + Unpin + futures::Sink<((), LocalTaskType)> + 'static,
16 G: Hash + Eq + Clone + Debug + 'static,
17 {
18 let f = LocalSpawner::new(queue, self, ());
19 assert_future::<_, _>(f)
20 }
21
22 #[inline]
23 fn spawn_with<Tx, G, D>(
24 self,
25 queue: &LocalTaskExecQueue<Tx, G, D>,
26 name: D,
27 ) -> LocalSpawner<Self, Tx, G, D>
28 where
29 Self: Sized + 'static,
30 Self::Output: 'static,
31 Tx: Clone + Unpin + futures::Sink<(D, LocalTaskType)> + 'static,
32 G: Hash + Eq + Clone + Debug + 'static,
33 {
34 let f = LocalSpawner::new(queue, self, name);
35 assert_future::<_, _>(f)
36 }
37}
38
39pub struct LocalBuilder {
40 workers: usize,
41 queue_max: usize,
42}
43
44impl Default for LocalBuilder {
45 fn default() -> Self {
46 Self {
47 workers: 100,
48 queue_max: 100_000,
49 }
50 }
51}
52
53impl LocalBuilder {
54 #[inline]
55 pub fn workers(mut self, workers: usize) -> Self {
56 self.workers = workers;
57 self
58 }
59
60 #[inline]
61 pub fn queue_max(mut self, max: usize) -> Self {
62 self.queue_max = max;
63 self
64 }
65
66 #[inline]
67 pub fn group(self) -> GroupLocalBuilder {
68 GroupLocalBuilder { builder: self }
69 }
70
71 #[inline]
72 pub fn with_channel<Tx, Rx, D>(self, tx: Tx, rx: Rx) -> ChannelLocalBuilder<Tx, Rx, D>
73 where
74 Tx: Clone + futures::Sink<(D, LocalTaskType)> + Unpin + 'static,
75 Rx: futures::Stream<Item = (D, LocalTaskType)> + Unpin,
76 {
77 ChannelLocalBuilder {
78 builder: self,
79 tx,
80 rx,
81 _d: std::marker::PhantomData,
82 }
83 }
84
85 #[inline]
86 pub fn build(self) -> (LocalTaskExecQueue, impl futures::Future<Output = ()>) {
87 let (tx, rx) = futures::channel::mpsc::channel(self.queue_max);
88 LocalTaskExecQueue::with_channel(self.workers, self.queue_max, LocalSender::new(tx), rx)
89 }
90}
91
92pub struct GroupLocalBuilder {
93 builder: LocalBuilder,
94}
95
96impl GroupLocalBuilder {
97 #[inline]
98 pub fn build<G>(
99 self,
100 ) -> (
101 LocalTaskExecQueue<futures::channel::mpsc::Sender<((), LocalTaskType)>, G>,
102 impl futures::Future<Output = ()>,
103 )
104 where
105 G: Hash + Eq + Clone + Debug + 'static,
106 {
107 let (tx, rx) = futures::channel::mpsc::channel(self.builder.queue_max);
108 LocalTaskExecQueue::with_channel(self.builder.workers, self.builder.queue_max, tx, rx)
109 }
110}
111
112pub struct ChannelLocalBuilder<Tx, Rx, D> {
113 builder: LocalBuilder,
114 tx: Tx,
115 rx: Rx,
116 _d: std::marker::PhantomData<D>,
117}
118
119impl<Tx, Rx, D> ChannelLocalBuilder<Tx, Rx, D>
120where
121 Tx: Clone + futures::Sink<(D, LocalTaskType)> + Unpin + 'static,
122 Rx: futures::Stream<Item = (D, LocalTaskType)> + Unpin,
123{
124 #[inline]
125 pub fn build(
126 self,
127 ) -> (
128 LocalTaskExecQueue<Tx, (), D>,
129 impl futures::Future<Output = ()>,
130 ) {
131 LocalTaskExecQueue::with_channel(
132 self.builder.workers,
133 self.builder.queue_max,
134 self.tx,
135 self.rx,
136 )
137 }
138
139 #[inline]
140 pub fn group(self) -> GroupChannelLocalBuilder<Tx, Rx, D> {
141 GroupChannelLocalBuilder { builder: self }
142 }
143}
144
145pub struct GroupChannelLocalBuilder<Tx, Rx, D> {
146 builder: ChannelLocalBuilder<Tx, Rx, D>,
147}
148
149impl<Tx, Rx, D> GroupChannelLocalBuilder<Tx, Rx, D>
150where
151 Tx: Clone + futures::Sink<((), LocalTaskType)> + Unpin + 'static,
152 Rx: futures::Stream<Item = ((), LocalTaskType)> + Unpin,
153{
154 #[inline]
155 pub fn build<G>(self) -> (LocalTaskExecQueue<Tx, G>, impl futures::Future<Output = ()>)
156 where
157 G: Hash + Eq + Clone + Debug + 'static,
158 {
159 LocalTaskExecQueue::with_channel(
160 self.builder.builder.workers,
161 self.builder.builder.queue_max,
162 self.builder.tx,
163 self.builder.rx,
164 )
165 }
166}
167
168pub type LocalSender<D, E> = mpsc::LocalSender<(D, LocalTaskType), E>;