http_type/task/impl.rs
1use crate::*;
2
3/// Creates a default Task instance.
4impl Default for Task {
5 /// Creates a default Task instance.
6 ///
7 /// # Returns
8 ///
9 /// - `Self`: The default instance.
10 #[inline(always)]
11 fn default() -> Self {
12 let worker_count: usize = Handle::try_current()
13 .map(|handle: Handle| handle.metrics().num_workers())
14 .unwrap_or_default()
15 .max(1);
16 Self::new(worker_count)
17 }
18}
19
20/// Automatically shuts down the task pool when the Task instance is dropped.
21impl Drop for Task {
22 /// Shuts down the task pool when the Task instance is dropped.
23 #[inline(always)]
24 fn drop(&mut self) {
25 self.shutdown();
26 }
27}
28
29impl Task {
30 /// Creates a new Task instance.
31 ///
32 /// # Arguments
33 ///
34 /// - `usize`: The number of worker threads to spawn.
35 ///
36 /// # Returns
37 ///
38 /// - `Self`: The new instance.
39 pub fn new(worker_count: usize) -> Self {
40 let mut pool: Vec<UnboundedSender<AsyncTask>> = Vec::with_capacity(worker_count);
41 let counter: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
42 let shutdown: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
43 let notifies: Vec<Arc<Notify>> = (0..worker_count)
44 .map(|_: usize| Arc::new(Notify::new()))
45 .collect();
46 for notify in notifies.iter().take(worker_count) {
47 let (sender, mut receiver): (UnboundedSender<AsyncTask>, UnboundedReceiver<AsyncTask>) =
48 unbounded_channel();
49 pool.push(sender);
50 let shutdown_clone: Arc<AtomicBool> = shutdown.clone();
51 let notify_clone: Arc<Notify> = notify.clone();
52 spawn_blocking(move || {
53 Handle::current().block_on(LocalSet::new().run_until(async move {
54 loop {
55 if shutdown_clone.load(atomic::Ordering::Relaxed) {
56 break;
57 }
58 match receiver.try_recv() {
59 Ok(task) => {
60 spawn_local(task);
61 }
62 Err(_) => {
63 notify_clone.notified().await;
64 }
65 }
66 }
67 }));
68 });
69 }
70 Self {
71 pool,
72 counter,
73 shutdown,
74 notifies,
75 }
76 }
77
78 /// Attempts to spawn a server task onto the global server task pool.
79 ///
80 /// This function sends the task to one of the worker threads in the pool.
81 /// The worker is selected using a round-robin algorithm based on an atomic counter,
82 /// or a forced index if provided.
83 ///
84 /// # Arguments
85 ///
86 /// - `Option<usize>`: An optional index to force selection of a specific worker.
87 /// If None, the worker is selected using round-robin distribution.
88 /// - `Future<Output = ()> + Send + 'static`: The future to spawn on the task pool.
89 ///
90 /// # Returns
91 ///
92 /// - `bool`: true if the task was successfully sent, false otherwise.
93 pub fn try_spawn_local<F>(&self, index_opt: Option<usize>, hook: F) -> bool
94 where
95 F: Future<Output = ()> + Send + 'static,
96 {
97 if self.get_pool().is_empty() {
98 return false;
99 }
100 let index: usize = index_opt
101 .unwrap_or(self.get_counter().fetch_add(1, atomic::Ordering::Relaxed))
102 .wrapping_rem(self.get_pool().len());
103 if let Some(sender) = self.get_pool().get(index) {
104 let result: bool = sender.send(Box::pin(hook)).is_ok();
105 if result && let Some(notify) = self.get_notifies().get(index) {
106 notify.notify_one()
107 }
108 return result;
109 }
110 false
111 }
112
113 /// Shuts down the task pool.
114 ///
115 /// This function sets the shutdown flag and wakes all waiting worker threads,
116 /// causing them to exit their processing loops.
117 #[inline(always)]
118 pub fn shutdown(&self) {
119 self.get_shutdown().store(true, atomic::Ordering::Relaxed);
120 self.get_notifies()
121 .iter()
122 .for_each(|notify: &Arc<Notify>| notify.notify_one());
123 }
124}