task_exec_queue/
local_builder.rs

1use std::fmt::Debug;
2use std::hash::Hash;
3use std::marker::Unpin;
4
5use super::{assert_future, LocalSpawner, LocalTaskExecQueue, LocalTaskType};
6
7impl<T: ?Sized> LocalSpawnExt for T where T: futures::Future {}
8
9pub trait LocalSpawnExt: futures::Future {
10    #[inline]
11    fn spawn<Tx, G>(self, queue: &LocalTaskExecQueue<Tx, G>) -> LocalSpawner<Self, Tx, G, ()>
12    where
13        Self: Sized + 'static,
14        Self::Output: 'static,
15        Tx: Clone + Unpin + futures::Sink<((), LocalTaskType)> + 'static,
16        G: Hash + Eq + Clone + Debug + 'static,
17    {
18        let f = LocalSpawner::new(queue, self, ());
19        assert_future::<_, _>(f)
20    }
21
22    #[inline]
23    fn spawn_with<Tx, G, D>(
24        self,
25        queue: &LocalTaskExecQueue<Tx, G, D>,
26        name: D,
27    ) -> LocalSpawner<Self, Tx, G, D>
28    where
29        Self: Sized + 'static,
30        Self::Output: 'static,
31        Tx: Clone + Unpin + futures::Sink<(D, LocalTaskType)> + 'static,
32        G: Hash + Eq + Clone + Debug + 'static,
33    {
34        let f = LocalSpawner::new(queue, self, name);
35        assert_future::<_, _>(f)
36    }
37}
38
39pub struct LocalBuilder {
40    workers: usize,
41    queue_max: usize,
42}
43
44impl Default for LocalBuilder {
45    fn default() -> Self {
46        Self {
47            workers: 100,
48            queue_max: 100_000,
49        }
50    }
51}
52
53impl LocalBuilder {
54    #[inline]
55    pub fn workers(mut self, workers: usize) -> Self {
56        self.workers = workers;
57        self
58    }
59
60    #[inline]
61    pub fn queue_max(mut self, max: usize) -> Self {
62        self.queue_max = max;
63        self
64    }
65
66    #[inline]
67    pub fn group(self) -> GroupLocalBuilder {
68        GroupLocalBuilder { builder: self }
69    }
70
71    #[inline]
72    pub fn with_channel<Tx, Rx, D>(self, tx: Tx, rx: Rx) -> ChannelLocalBuilder<Tx, Rx, D>
73    where
74        Tx: Clone + futures::Sink<(D, LocalTaskType)> + Unpin + 'static,
75        Rx: futures::Stream<Item = (D, LocalTaskType)> + Unpin,
76    {
77        ChannelLocalBuilder {
78            builder: self,
79            tx,
80            rx,
81            _d: std::marker::PhantomData,
82        }
83    }
84
85    #[inline]
86    pub fn build(self) -> (LocalTaskExecQueue, impl futures::Future<Output = ()>) {
87        let (tx, rx) = futures::channel::mpsc::channel(self.queue_max);
88        LocalTaskExecQueue::with_channel(self.workers, self.queue_max, LocalSender::new(tx), rx)
89    }
90}
91
92pub struct GroupLocalBuilder {
93    builder: LocalBuilder,
94}
95
96impl GroupLocalBuilder {
97    #[inline]
98    pub fn build<G>(
99        self,
100    ) -> (
101        LocalTaskExecQueue<futures::channel::mpsc::Sender<((), LocalTaskType)>, G>,
102        impl futures::Future<Output = ()>,
103    )
104    where
105        G: Hash + Eq + Clone + Debug + 'static,
106    {
107        let (tx, rx) = futures::channel::mpsc::channel(self.builder.queue_max);
108        LocalTaskExecQueue::with_channel(self.builder.workers, self.builder.queue_max, tx, rx)
109    }
110}
111
112pub struct ChannelLocalBuilder<Tx, Rx, D> {
113    builder: LocalBuilder,
114    tx: Tx,
115    rx: Rx,
116    _d: std::marker::PhantomData<D>,
117}
118
119impl<Tx, Rx, D> ChannelLocalBuilder<Tx, Rx, D>
120where
121    Tx: Clone + futures::Sink<(D, LocalTaskType)> + Unpin + 'static,
122    Rx: futures::Stream<Item = (D, LocalTaskType)> + Unpin,
123{
124    #[inline]
125    pub fn build(
126        self,
127    ) -> (
128        LocalTaskExecQueue<Tx, (), D>,
129        impl futures::Future<Output = ()>,
130    ) {
131        LocalTaskExecQueue::with_channel(
132            self.builder.workers,
133            self.builder.queue_max,
134            self.tx,
135            self.rx,
136        )
137    }
138
139    #[inline]
140    pub fn group(self) -> GroupChannelLocalBuilder<Tx, Rx, D> {
141        GroupChannelLocalBuilder { builder: self }
142    }
143}
144
145pub struct GroupChannelLocalBuilder<Tx, Rx, D> {
146    builder: ChannelLocalBuilder<Tx, Rx, D>,
147}
148
149impl<Tx, Rx, D> GroupChannelLocalBuilder<Tx, Rx, D>
150where
151    Tx: Clone + futures::Sink<((), LocalTaskType)> + Unpin + 'static,
152    Rx: futures::Stream<Item = ((), LocalTaskType)> + Unpin,
153{
154    #[inline]
155    pub fn build<G>(self) -> (LocalTaskExecQueue<Tx, G>, impl futures::Future<Output = ()>)
156    where
157        G: Hash + Eq + Clone + Debug + 'static,
158    {
159        LocalTaskExecQueue::with_channel(
160            self.builder.builder.workers,
161            self.builder.builder.queue_max,
162            self.builder.tx,
163            self.builder.rx,
164        )
165    }
166}
167
168pub type LocalSender<D, E> = mpsc::LocalSender<(D, LocalTaskType), E>;