Skip to main content

http_type/task/
impl.rs

1use crate::*;
2
3/// Creates a default Task instance.
4impl Default for Task {
5    /// Creates a default Task instance.
6    ///
7    /// # Returns
8    ///
9    /// - `Self`: The default instance.
10    #[inline(always)]
11    fn default() -> Self {
12        let worker_count: usize = Handle::try_current()
13            .map(|handle: Handle| handle.metrics().num_workers())
14            .unwrap_or_default()
15            .max(1);
16        Self::new(worker_count)
17    }
18}
19
20/// Automatically shuts down the task pool when the Task instance is dropped.
21impl Drop for Task {
22    /// Shuts down the task pool when the Task instance is dropped.
23    #[inline(always)]
24    fn drop(&mut self) {
25        self.shutdown();
26    }
27}
28
29impl Task {
30    /// Creates a new Task instance.
31    ///
32    /// # Arguments
33    ///
34    /// - `usize`: The number of worker threads to spawn.
35    ///
36    /// # Returns
37    ///
38    /// - `Self`: The new instance.
39    pub fn new(worker_count: usize) -> Self {
40        let mut pool: Vec<UnboundedSender<AsyncTask>> = Vec::with_capacity(worker_count);
41        let counter: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
42        let shutdown: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
43        let notifies: Vec<Arc<Notify>> = (0..worker_count)
44            .map(|_: usize| Arc::new(Notify::new()))
45            .collect();
46        for notify in notifies.iter().take(worker_count) {
47            let (sender, mut receiver): (UnboundedSender<AsyncTask>, UnboundedReceiver<AsyncTask>) =
48                unbounded_channel();
49            pool.push(sender);
50            let shutdown_clone: Arc<AtomicBool> = shutdown.clone();
51            let notify_clone: Arc<Notify> = notify.clone();
52            spawn_blocking(move || {
53                Handle::current().block_on(LocalSet::new().run_until(async move {
54                    loop {
55                        if shutdown_clone.load(atomic::Ordering::Relaxed) {
56                            break;
57                        }
58                        match receiver.try_recv() {
59                            Ok(task) => {
60                                spawn_local(task);
61                            }
62                            Err(_) => {
63                                notify_clone.notified().await;
64                            }
65                        }
66                    }
67                }));
68            });
69        }
70        Self {
71            pool,
72            counter,
73            shutdown,
74            notifies,
75        }
76    }
77
78    /// Attempts to spawn a server task onto the global server task pool.
79    ///
80    /// This function sends the task to one of the worker threads in the pool.
81    /// The worker is selected using a round-robin algorithm based on an atomic counter,
82    /// or a forced index if provided.
83    ///
84    /// # Arguments
85    ///
86    /// - `Option<usize>`: An optional index to force selection of a specific worker.
87    ///   If None, the worker is selected using round-robin distribution.
88    /// - `Future<Output = ()> + Send + 'static`: The future to spawn on the task pool.
89    ///
90    /// # Returns
91    ///
92    /// - `bool`: true if the task was successfully sent, false otherwise.
93    pub fn try_spawn_local<F>(&self, index_opt: Option<usize>, hook: F) -> bool
94    where
95        F: Future<Output = ()> + Send + 'static,
96    {
97        if self.get_pool().is_empty() {
98            return false;
99        }
100        let index: usize = index_opt
101            .unwrap_or(self.get_counter().fetch_add(1, atomic::Ordering::Relaxed))
102            .wrapping_rem(self.get_pool().len());
103        if let Some(sender) = self.get_pool().get(index) {
104            let result: bool = sender.send(Box::pin(hook)).is_ok();
105            if result && let Some(notify) = self.get_notifies().get(index) {
106                notify.notify_one()
107            }
108            return result;
109        }
110        false
111    }
112
113    /// Shuts down the task pool.
114    ///
115    /// This function sets the shutdown flag and wakes all waiting worker threads,
116    /// causing them to exit their processing loops.
117    #[inline(always)]
118    pub fn shutdown(&self) {
119        self.get_shutdown().store(true, atomic::Ordering::Relaxed);
120        self.get_notifies()
121            .iter()
122            .for_each(|notify: &Arc<Notify>| notify.notify_one());
123    }
124}