1use std::future::Future;
2use std::time::Duration;
3use std::task::{Poll, Waker};
4use std::io::{Error, Result, ErrorKind};
5use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
6
7use parking_lot::{Mutex, Condvar};
8use futures::{future::LocalBoxFuture,
9 stream::{Stream, LocalBoxStream}};
10
11use crate::rt::{TaskId, AsyncPipelineResult,
12 serial::{AsyncRuntime,
13 AsyncRuntimeExt,
14 AsyncTaskPool,
15 AsyncTaskPoolExt,
16 AsyncWait,
17 AsyncWaitAny,
18 AsyncWaitAnyCallback,
19 AsyncMapReduce,
20 LocalAsyncRuntime,
21 spawn_worker_thread, wakeup_worker_thread},
22 serial_single_thread::{SingleTaskPool, SingleTaskRunner, SingleTaskRuntime}};
23
24pub struct WorkerRuntime<
28 O: Default + 'static = (),
29 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
30>(Arc<(
31 Arc<AtomicBool>, Arc<(AtomicBool, Mutex<()>, Condvar)>, SingleTaskRuntime<O, P> )>);
35
36unsafe impl<
37 O: Default + 'static,
38 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
39> Send for WorkerRuntime<O, P> {}
40unsafe impl<
41 O: Default + 'static,
42 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
43> Sync for WorkerRuntime<O, P> {}
44
45impl<
46 O: Default + 'static,
47 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
48> Clone for WorkerRuntime<O, P> {
49 fn clone(&self) -> Self {
50 WorkerRuntime(self.0.clone())
51 }
52}
53
54impl<
55 O: Default + 'static,
56 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
57> AsyncRuntime<O> for WorkerRuntime<O, P> {
58 type Pool = P;
59
60 #[inline]
62 fn shared_pool(&self) -> Arc<Self::Pool> {
63 (self.0).2.shared_pool()
64 }
65
66 #[inline]
68 fn get_id(&self) -> usize {
69 (self.0).2.get_id()
70 }
71
72 #[inline]
74 fn wait_len(&self) -> usize {
75 (self.0).2.wait_len()
76 }
77
78 #[inline]
80 fn len(&self) -> usize {
81 (self.0).2.len()
82 }
83
84 #[inline]
86 fn alloc<R: 'static>(&self) -> TaskId {
87 (self.0).2.alloc::<R>()
88 }
89
90 fn spawn<F>(&self, future: F) -> Result<TaskId>
92 where F: Future<Output = O> + 'static {
93 if !(self.0).0.load(Ordering::SeqCst) {
94 return Err(Error::new(ErrorKind::Other, "Spawn async task failed, reason: worker already closed"));
95 }
96
97 let result = (self.0).2.spawn(future);
98 wakeup_worker_thread(&(self.0).1, &(self.0).2);
99 result
100 }
101
102 fn spawn_local<F>(&self, future: F) -> Result<TaskId>
104 where
105 F: Future<Output = O> + 'static {
106 if !(self.0).0.load(Ordering::SeqCst) {
107 return Err(Error::new(ErrorKind::Other, "Spawn local async task failed, reason: worker already closed"));
108 }
109
110 let result = (self.0).2.spawn_local(future);
111 wakeup_worker_thread(&(self.0).1, &(self.0).2);
112 result
113 }
114
115 fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
117 where
118 F: Future<Output = O> + 'static {
119 if !(self.0).0.load(Ordering::SeqCst) {
120 return Err(Error::new(ErrorKind::Other, "Spawn priority async task failed, reason: worker already closed"));
121 }
122
123 let result = (self.0).2.spawn_priority(priority, future);
124 wakeup_worker_thread(&(self.0).1, &(self.0).2);
125 result
126 }
127
128 fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
130 where
131 F: Future<Output = O> + 'static {
132 if !(self.0).0.load(Ordering::SeqCst) {
133 return Err(Error::new(ErrorKind::Other, "Spawn yield priority async task failed, reason: worker already closed"));
134 }
135
136 let result = (self.0).2.spawn_yield(future);
137 wakeup_worker_thread(&(self.0).1, &(self.0).2);
138 result
139 }
140
141 fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
143 where F: Future<Output = O> + 'static {
144 if !(self.0).0.load(Ordering::SeqCst) {
145 return Err(Error::new(ErrorKind::Other, "Spawn timing async task failed, reason: worker already closed"));
146 }
147
148 let result = (self.0).2.spawn_timing(future, time);
149 wakeup_worker_thread(&(self.0).1, &(self.0).2);
150 result
151 }
152
153 fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
155 where
156 F: Future<Output=O> + 'static {
157 if !(self.0).0.load(Ordering::SeqCst) {
158 return Err(Error::new(ErrorKind::Other, "Spawn async task by id failed, reason: worker already closed"));
159 }
160
161 let result = (self.0).2.spawn_by_id(task_id, future);
162 wakeup_worker_thread(&(self.0).1, &(self.0).2);
163 result
164 }
165
166 fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
168 where
169 F: Future<Output=O> + 'static {
170 if !(self.0).0.load(Ordering::SeqCst) {
171 return Err(Error::new(ErrorKind::Other, "Spawn local async task by id failed, reason: worker already closed"));
172 }
173
174 let result = (self.0).2.spawn_local_by_id(task_id, future);
175 wakeup_worker_thread(&(self.0).1, &(self.0).2);
176 result
177 }
178
179 fn spawn_priority_by_id<F>(&self,
181 task_id: TaskId,
182 priority: usize,
183 future: F) -> Result<()>
184 where
185 F: Future<Output=O> + 'static {
186 if !(self.0).0.load(Ordering::SeqCst) {
187 return Err(Error::new(ErrorKind::Other, "Spawn priority async task by id failed, reason: worker already closed"));
188 }
189
190 let result = (self.0).2.spawn_priority_by_id(task_id, priority, future);
191 wakeup_worker_thread(&(self.0).1, &(self.0).2);
192 result
193 }
194
195 fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
197 where
198 F: Future<Output=O> + 'static {
199 if !(self.0).0.load(Ordering::SeqCst) {
200 return Err(Error::new(ErrorKind::Other, "Spawn yield async task by id failed, reason: worker already closed"));
201 }
202
203 let result = (self.0).2.spawn_yield_by_id(task_id, future);
204 wakeup_worker_thread(&(self.0).1, &(self.0).2);
205 result
206 }
207
208 fn spawn_timing_by_id<F>(&self,
210 task_id: TaskId,
211 future: F,
212 time: usize) -> Result<()>
213 where
214 F: Future<Output=O> + 'static {
215 if !(self.0).0.load(Ordering::SeqCst) {
216 return Err(Error::new(ErrorKind::Other, "Spawn timing async task by id failed, reason: worker already closed"));
217 }
218
219 let result = (self.0).2.spawn_timing_by_id(task_id, future, time);
220 wakeup_worker_thread(&(self.0).1, &(self.0).2);
221 result
222 }
223
224 #[inline]
226 fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
227 (self.0).2.pending::<Output>(task_id, waker)
228 }
229
230 #[inline]
232 fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
233 (self.0).2.wakeup::<Output>(task_id);
234 }
235
236 #[inline]
238 fn wait<V: 'static>(&self) -> AsyncWait<V> {
239 (self.0).2.wait()
240 }
241
242 #[inline]
244 fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
245 (self.0).2.wait_any(capacity)
246 }
247
248 #[inline]
250 fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
251 (self.0).2.wait_any_callback(capacity)
252 }
253
254 #[inline]
256 fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
257 (self.0).2.map_reduce(capacity)
258 }
259
260 #[inline]
262 fn timeout(&self, timeout: usize) -> LocalBoxFuture<'static, ()> {
263 (self.0).2.timeout(timeout)
264 }
265
266 #[inline]
268 fn yield_now(&self) -> LocalBoxFuture<'static, ()> {
269 (self.0).2.yield_now()
270 }
271
272 #[inline]
274 fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> LocalBoxStream<'static, FO>
275 where S: Stream<Item = SO> + 'static,
276 SO: 'static,
277 F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
278 FO: 'static {
279 (self.0).2.pipeline(input, filter)
280 }
281
282 fn close(&self) -> bool {
284 if cfg!(target_arch = "aarch64") {
285 if let Ok(true) = (self.0).0.compare_exchange(true,
286 false,
287 Ordering::SeqCst,
288 Ordering::SeqCst) {
289 wakeup_worker_thread(&(self.0).1, &(self.0).2);
291 true
292 } else {
293 false
294 }
295 } else {
296 if let Ok(true) = (self.0).0.compare_exchange_weak(true,
297 false,
298 Ordering::SeqCst,
299 Ordering::SeqCst) {
300 wakeup_worker_thread(&(self.0).1, &(self.0).2);
302 true
303 } else {
304 false
305 }
306 }
307 }
308}
309
310impl<
311 O: Default + 'static,
312 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
313> AsyncRuntimeExt<O> for WorkerRuntime<O, P> {
314 #[inline]
315 fn spawn_with_context<F, C>(&self,
316 task_id: TaskId,
317 future: F,
318 context: C) -> Result<()>
319 where F: Future<Output = O> + 'static,
320 C: 'static {
321 (self.0).2.spawn_with_context(task_id, future, context)
322 }
323
324 #[inline]
325 fn spawn_timing_with_context<F, C>(&self,
326 task_id: TaskId,
327 future: F,
328 context: C,
329 time: usize) -> Result<()>
330 where F: Future<Output = O> + 'static,
331 C: 'static {
332 (self.0).2.spawn_timing_with_context(task_id, future, context, time)
333 }
334
335 #[inline]
336 fn block_on<F>(&self, future: F) -> Result<F::Output>
337 where F: Future + 'static,
338 <F as Future>::Output: Default + 'static {
339 (self.0).2.block_on::<F>(future)
340 }
341}
342
343impl<
344 O: Default + 'static,
345 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
346> WorkerRuntime<O, P> {
347 pub fn get_worker_status(&self) -> &Arc<AtomicBool> {
349 &(self.0).0
350 }
351
352 pub fn get_worker_waker(&self) -> &Arc<(AtomicBool, Mutex<()>, Condvar)> {
354 &(self.0).1
355 }
356
357 pub fn get_worker_runtime(&self) -> &SingleTaskRuntime<O, P> {
359 &(self.0).2
360 }
361
362 pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
364 LocalAsyncRuntime::new(
365 self.as_raw(),
366 WorkerRuntime::<O, P>::get_id_raw,
367 WorkerRuntime::<O, P>::spawn_raw,
368 WorkerRuntime::<O, P>::spawn_timing_raw,
369 WorkerRuntime::<O, P>::timeout_raw
370 )
371 }
372
373 #[inline]
375 pub(crate) fn as_raw(&self) -> *const () {
376 Arc::into_raw(self.0.clone()) as *const ()
377 }
378
379 #[inline]
381 pub(crate) fn from_raw(raw: *const ()) -> Self {
382 let inner = unsafe {
383 Arc::from_raw(raw as *const (
384 Arc<AtomicBool>,
385 Arc<(AtomicBool, Mutex<()>, Condvar)>,
386 SingleTaskRuntime<O, P>),
387 )
388 };
389 WorkerRuntime(inner)
390 }
391
392 pub(crate) fn get_id_raw(raw: *const ()) -> usize {
394 let rt = WorkerRuntime::<O, P>::from_raw(raw);
395 let id = rt.get_id();
396 Arc::into_raw(rt.0); id
398 }
399
400 pub(crate) fn spawn_raw(raw: *const (),
402 future: LocalBoxFuture<'static, O>) -> Result<()> {
403 let rt = WorkerRuntime::<O, P>::from_raw(raw);
404 let result = rt.spawn_by_id(rt.alloc::<O>(), future);
405 Arc::into_raw(rt.0); result
407 }
408
409 pub(crate) fn spawn_timing_raw(raw: *const (),
411 future: LocalBoxFuture<'static, O>,
412 timeout: usize) -> Result<()> {
413 let rt = WorkerRuntime::<O, P>::from_raw(raw);
414 let result = rt.spawn_timing_by_id(rt.alloc::<O>(), future, timeout);
415 Arc::into_raw(rt.0); result
417 }
418
419 pub(crate) fn timeout_raw(raw: *const (),
421 timeout: usize) -> LocalBoxFuture<'static, ()> {
422 let rt = WorkerRuntime::<O, P>::from_raw(raw);
423 let boxed = rt.timeout(timeout);
424 Arc::into_raw(rt.0); boxed
426 }
427}
428
429pub struct WorkerTaskRunner<
433 O: Default + 'static = (),
434 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P> = SingleTaskPool<O>,
435>(Arc<(
436 Arc<AtomicBool>, Arc<(AtomicBool, Mutex<()>, Condvar)>, SingleTaskRunner<O, P>, WorkerRuntime<O, P>, )>);
441
442unsafe impl<
443 O: Default + 'static,
444 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
445> Send for WorkerTaskRunner<O, P> {}
446unsafe impl<
447 O: Default + 'static,
448 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
449> Sync for WorkerTaskRunner<O, P> {}
450
451impl<
452 O: Default + 'static,
453 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
454> Clone for WorkerTaskRunner<O, P> {
455 fn clone(&self) -> Self {
456 WorkerTaskRunner(self.0.clone())
457 }
458}
459
460impl<
461 O: Default + 'static,
462 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
463> From<(Arc<AtomicBool>, Arc<(AtomicBool, Mutex<()>, Condvar)>, SingleTaskRuntime<O, P>)> for WorkerRuntime<O, P> {
464 fn from(from: (Arc<AtomicBool>,
466 Arc<(AtomicBool, Mutex<()>, Condvar)>,
467 SingleTaskRuntime<O, P>,)) -> Self {
468 WorkerRuntime(Arc::new(from))
469 }
470}
471
472impl<O: Default + 'static> Default for WorkerTaskRunner<O> {
473 fn default() -> Self {
474 WorkerTaskRunner::new(SingleTaskPool::default(),
475 Arc::new(AtomicBool::new(true)),
476 Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new())))
477 }
478}
479
480impl<
481 O: Default + 'static,
482 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
483> WorkerTaskRunner<O, P> {
484 pub fn new(pool: P,
486 worker_status: Arc<AtomicBool>,
487 worker_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) -> Self {
488 let runner = SingleTaskRunner::new(pool);
489 let rt = runner.startup().unwrap();
490 let inner = (worker_status.clone(), worker_waker.clone(), rt);
491 let runtime = WorkerRuntime(Arc::new(inner));
492
493 let inner = (worker_status,
494 worker_waker,
495 runner,
496 runtime);
497
498 WorkerTaskRunner(Arc::new(inner))
499 }
500
501 pub fn get_runtime(&self) -> WorkerRuntime<O, P> {
503 (self.0).3.clone()
504 }
505
506 #[inline]
508 pub fn run_once(&self) -> Result<usize> {
509 (self.0).2.run_once()
510 }
511
512 #[inline]
514 pub fn run(&self) -> Result<usize> {
515 (self.0).2.run()
516 }
517
518 pub fn startup<LF, GQL>(self,
520 thread_name: &str,
521 thread_stack_size: usize,
522 sleep_timeout: u64,
523 loop_interval: Option<u64>,
524 loop_func: LF,
525 get_queue_len: GQL) -> WorkerRuntime<O, P>
526 where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
527 LF: Fn() -> (bool, Duration) + Send + 'static,
528 GQL: Fn() -> usize + Send + 'static{
529 let rt_copy = (self.0).3.clone();
530 let thread_handler = (self.0).0.clone();
531 let thread_waker = (self.0).1.clone();
532 spawn_worker_thread(
533 thread_name,
534 thread_stack_size,
535 thread_handler,
536 thread_waker,
537 sleep_timeout,
538 loop_interval,
539 loop_func,
540 move || {
541 rt_copy.wait_len() + get_queue_len()
542 },
543 );
544
545 (self.0).3.clone()
546 }
547}