Skip to main content

http_type/task/
impl.rs

1use crate::*;
2
3/// Creates a default Task instance.
4impl Default for Task {
5    /// Creates a default Task instance.
6    ///
7    /// # Returns
8    ///
9    /// - `Self`: The default instance.
10    #[inline(always)]
11    fn default() -> Self {
12        let worker_count: usize = Handle::try_current()
13            .map(|handle: Handle| handle.metrics().num_workers())
14            .unwrap_or_default()
15            .max(1);
16        Self::new(worker_count)
17    }
18}
19
20/// Automatically shuts down the task pool when the Task instance is dropped.
21impl Drop for Task {
22    /// Shuts down the task pool when the Task instance is dropped.
23    #[inline(always)]
24    fn drop(&mut self) {
25        self.shutdown();
26    }
27}
28
29impl Task {
30    /// Creates a new Task instance.
31    ///
32    /// # Arguments
33    ///
34    /// - `usize`: The number of worker threads to spawn.
35    ///
36    /// # Returns
37    ///
38    /// - `Self`: The new instance.
39    pub fn new(worker_count: usize) -> Self {
40        let mut pool: Vec<UnboundedSender<AsyncTask>> = Vec::with_capacity(worker_count);
41        let counter: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
42        let shutdown: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
43        let notifies: Vec<Arc<Notify>> =
44            (0..worker_count).map(|_| Arc::new(Notify::new())).collect();
45        for notify in notifies.iter().take(worker_count) {
46            let (sender, mut receiver): (UnboundedSender<AsyncTask>, UnboundedReceiver<AsyncTask>) =
47                unbounded_channel();
48            pool.push(sender);
49            let shutdown_clone: Arc<AtomicBool> = shutdown.clone();
50            let notify_clone: Arc<Notify> = notify.clone();
51            spawn_blocking(move || {
52                Handle::current().block_on(LocalSet::new().run_until(async move {
53                    loop {
54                        if shutdown_clone.load(atomic::Ordering::Relaxed) {
55                            break;
56                        }
57                        match receiver.try_recv() {
58                            Ok(task) => {
59                                spawn_local(task);
60                            }
61                            Err(_) => {
62                                notify_clone.notified().await;
63                            }
64                        }
65                    }
66                }));
67            });
68        }
69        Self {
70            pool,
71            counter,
72            shutdown,
73            notifies,
74        }
75    }
76
77    /// Attempts to spawn a server task onto the global server task pool.
78    ///
79    /// This function sends the task to one of the worker threads in the pool.
80    /// The worker is selected using a round-robin algorithm based on an atomic counter,
81    /// or a forced index if provided.
82    ///
83    /// # Arguments
84    ///
85    /// - `Option<usize>`: An optional index to force selection of a specific worker.
86    ///   If None, the worker is selected using round-robin distribution.
87    /// - `Future<Output = ()> + Send + 'static`: The future to spawn on the task pool.
88    ///
89    /// # Returns
90    ///
91    /// - `bool`: true if the task was successfully sent, false otherwise.
92    pub fn try_spawn_local<F>(&self, index_opt: Option<usize>, hook: F) -> bool
93    where
94        F: Future<Output = ()> + Send + 'static,
95    {
96        if self.get_pool().is_empty() {
97            return false;
98        }
99        let index: usize = index_opt
100            .unwrap_or(self.get_counter().fetch_add(1, atomic::Ordering::Relaxed))
101            .wrapping_rem(self.get_pool().len());
102        if let Some(sender) = self.get_pool().get(index) {
103            let result: bool = sender.send(Box::pin(hook)).is_ok();
104            if result && let Some(notify) = self.get_notifies().get(index) {
105                notify.notify_one()
106            }
107            return result;
108        }
109        false
110    }
111
112    /// Shuts down the task pool.
113    ///
114    /// This function sets the shutdown flag and wakes all waiting worker threads,
115    /// causing them to exit their processing loops.
116    #[inline(always)]
117    pub fn shutdown(&self) {
118        self.get_shutdown().store(true, atomic::Ordering::Relaxed);
119        self.get_notifies()
120            .iter()
121            .for_each(|notify: &Arc<Notify>| notify.notify_one());
122    }
123}