http_type/task/impl.rs
1use crate::*;
2
3/// Creates a default Task instance.
4impl Default for Task {
5 /// Creates a default Task instance.
6 ///
7 /// # Returns
8 ///
9 /// - `Self`: The default instance.
10 #[inline(always)]
11 fn default() -> Self {
12 let worker_count: usize = Handle::try_current()
13 .map(|handle: Handle| handle.metrics().num_workers())
14 .unwrap_or_default()
15 .max(1);
16 Self::new(worker_count)
17 }
18}
19
20/// Automatically shuts down the task pool when the Task instance is dropped.
21impl Drop for Task {
22 /// Shuts down the task pool when the Task instance is dropped.
23 #[inline(always)]
24 fn drop(&mut self) {
25 self.shutdown();
26 }
27}
28
29impl Task {
30 /// Creates a new Task instance.
31 ///
32 /// # Arguments
33 ///
34 /// - `usize`: The number of worker threads to spawn.
35 ///
36 /// # Returns
37 ///
38 /// - `Self`: The new instance.
39 pub fn new(worker_count: usize) -> Self {
40 let mut pool: Vec<UnboundedSender<AsyncTask>> = Vec::with_capacity(worker_count);
41 let counter: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
42 let shutdown: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));
43 let notifies: Vec<Arc<Notify>> =
44 (0..worker_count).map(|_| Arc::new(Notify::new())).collect();
45 for notify in notifies.iter().take(worker_count) {
46 let (sender, mut receiver): (UnboundedSender<AsyncTask>, UnboundedReceiver<AsyncTask>) =
47 unbounded_channel();
48 pool.push(sender);
49 let shutdown_clone: Arc<AtomicBool> = shutdown.clone();
50 let notify_clone: Arc<Notify> = notify.clone();
51 spawn_blocking(move || {
52 Handle::current().block_on(LocalSet::new().run_until(async move {
53 loop {
54 if shutdown_clone.load(atomic::Ordering::Relaxed) {
55 break;
56 }
57 match receiver.try_recv() {
58 Ok(task) => {
59 spawn_local(task);
60 }
61 Err(_) => {
62 notify_clone.notified().await;
63 }
64 }
65 }
66 }));
67 });
68 }
69 Self {
70 pool,
71 counter,
72 shutdown,
73 notifies,
74 }
75 }
76
77 /// Attempts to spawn a server task onto the global server task pool.
78 ///
79 /// This function sends the task to one of the worker threads in the pool.
80 /// The worker is selected using a round-robin algorithm based on an atomic counter,
81 /// or a forced index if provided.
82 ///
83 /// # Arguments
84 ///
85 /// - `Option<usize>`: An optional index to force selection of a specific worker.
86 /// If None, the worker is selected using round-robin distribution.
87 /// - `Future<Output = ()> + Send + 'static`: The future to spawn on the task pool.
88 ///
89 /// # Returns
90 ///
91 /// - `bool`: true if the task was successfully sent, false otherwise.
92 pub fn try_spawn_local<F>(&self, index_opt: Option<usize>, hook: F) -> bool
93 where
94 F: Future<Output = ()> + Send + 'static,
95 {
96 if self.get_pool().is_empty() {
97 return false;
98 }
99 let index: usize = index_opt
100 .unwrap_or(self.get_counter().fetch_add(1, atomic::Ordering::Relaxed))
101 .wrapping_rem(self.get_pool().len());
102 if let Some(sender) = self.get_pool().get(index) {
103 let result: bool = sender.send(Box::pin(hook)).is_ok();
104 if result && let Some(notify) = self.get_notifies().get(index) {
105 notify.notify_one()
106 }
107 return result;
108 }
109 false
110 }
111
112 /// Shuts down the task pool.
113 ///
114 /// This function sets the shutdown flag and wakes all waiting worker threads,
115 /// causing them to exit their processing loops.
116 #[inline(always)]
117 pub fn shutdown(&self) {
118 self.get_shutdown().store(true, atomic::Ordering::Relaxed);
119 self.get_notifies()
120 .iter()
121 .for_each(|notify: &Arc<Notify>| notify.notify_one());
122 }
123}