1use std::future::Future;
2use std::time::Duration;
3use std::task::{Poll, Waker};
4use std::io::{Error, Result, ErrorKind};
5use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
6
7use parking_lot::{Mutex, Condvar};
8use futures::{future::BoxFuture,
9 stream::{Stream, BoxStream}};
10
11use crate::rt::{TaskId,
12 AsyncRuntime,
13 AsyncRuntimeExt,
14 AsyncTaskPool,
15 AsyncTaskPoolExt,
16 AsyncWait,
17 AsyncWaitAny,
18 AsyncWaitAnyCallback,
19 AsyncMapReduce,
20 AsyncPipelineResult,
21 LocalAsyncRuntime,
22 single_thread::{SingleTaskPool, SingleTaskRunner, SingleTaskRuntime},
23 spawn_worker_thread, wakeup_worker_thread};
24
25pub struct WorkerRuntime<
29 O: Default + 'static = (),
30 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O> = SingleTaskPool<O>,
31>(Arc<(
32 Arc<AtomicBool>, Arc<(AtomicBool, Mutex<()>, Condvar)>, SingleTaskRuntime<O, P> )>);
36
37unsafe impl<
38 O: Default + 'static,
39 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
40> Send for WorkerRuntime<O, P> {}
41unsafe impl<
42 O: Default + 'static,
43 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
44> Sync for WorkerRuntime<O, P> {}
45
46impl<
47 O: Default + 'static,
48 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O>,
49> Clone for WorkerRuntime<O, P> {
50 fn clone(&self) -> Self {
51 WorkerRuntime(self.0.clone())
52 }
53}
54
55impl<
56 O: Default + 'static,
57 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
58> AsyncRuntime<O> for WorkerRuntime<O, P> {
59 type Pool = P;
60
61 #[inline]
63 fn shared_pool(&self) -> Arc<Self::Pool> {
64 (self.0).2.shared_pool()
65 }
66
67 #[inline]
69 fn get_id(&self) -> usize {
70 (self.0).2.get_id()
71 }
72
73 #[inline]
75 fn wait_len(&self) -> usize {
76 (self.0).2.wait_len()
77 }
78
79 #[inline]
81 fn len(&self) -> usize {
82 (self.0).2.len()
83 }
84
85 #[inline]
87 fn alloc<R: 'static>(&self) -> TaskId {
88 (self.0).2.alloc::<R>()
89 }
90
91 fn spawn<F>(&self, future: F) -> Result<TaskId>
93 where
94 F: Future<Output = O> + Send + 'static {
95 if !(self.0).0.load(Ordering::SeqCst) {
96 return Err(Error::new(ErrorKind::Other, "Spawn async task failed, reason: worker already closed"));
97 }
98
99 let result = (self.0).2.spawn(future);
100 wakeup_worker_thread(&(self.0).1, &(self.0).2);
101 result
102 }
103
104 fn spawn_local<F>(&self, future: F) -> Result<TaskId>
106 where
107 F: Future<Output=O> + Send + 'static {
108 if !(self.0).0.load(Ordering::SeqCst) {
109 return Err(Error::new(ErrorKind::Other, "Spawn local async task failed, reason: worker already closed"));
110 }
111
112 let result = (self.0).2.spawn_local(future);
113 wakeup_worker_thread(&(self.0).1, &(self.0).2);
114 result
115 }
116
117 fn spawn_priority<F>(&self, priority: usize, future: F) -> Result<TaskId>
119 where
120 F: Future<Output=O> + Send + 'static {
121 if !(self.0).0.load(Ordering::SeqCst) {
122 return Err(Error::new(ErrorKind::Other, "Spawn priority async task failed, reason: worker already closed"));
123 }
124
125 let result = (self.0).2.spawn_priority(priority, future);
126 wakeup_worker_thread(&(self.0).1, &(self.0).2);
127 result
128 }
129
130 fn spawn_yield<F>(&self, future: F) -> Result<TaskId>
132 where
133 F: Future<Output=O> + Send + 'static {
134 if !(self.0).0.load(Ordering::SeqCst) {
135 return Err(Error::new(ErrorKind::Other, "Spawn yield async task failed, reason: worker already closed"));
136 }
137
138 let result = (self.0).2.spawn_yield(future);
139 wakeup_worker_thread(&(self.0).1, &(self.0).2);
140 result
141 }
142
143 fn spawn_timing<F>(&self, future: F, time: usize) -> Result<TaskId>
145 where
146 F: Future<Output = O> + Send + 'static {
147 if !(self.0).0.load(Ordering::SeqCst) {
148 return Err(Error::new(ErrorKind::Other, "Spawn timing async task failed, reason: worker already closed"));
149 }
150
151 let result = (self.0).2.spawn_timing(future, time);
152 wakeup_worker_thread(&(self.0).1, &(self.0).2);
153 result
154 }
155
156 fn spawn_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
158 where
159 F: Future<Output=O> + Send + 'static {
160 if !(self.0).0.load(Ordering::SeqCst) {
161 return Err(Error::new(ErrorKind::Other, "Spawn async task by id failed, reason: worker already closed"));
162 }
163
164 let result = (self.0).2.spawn_by_id(task_id, future);
165 wakeup_worker_thread(&(self.0).1, &(self.0).2);
166 result
167 }
168
169 fn spawn_local_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
171 where
172 F: Future<Output=O> + Send + 'static {
173 if !(self.0).0.load(Ordering::SeqCst) {
174 return Err(Error::new(ErrorKind::Other, "Spawn local async task by id failed, reason: worker already closed"));
175 }
176
177 let result = (self.0).2.spawn_local_by_id(task_id, future);
178 wakeup_worker_thread(&(self.0).1, &(self.0).2);
179 result
180 }
181
182 fn spawn_priority_by_id<F>(&self,
184 task_id: TaskId,
185 priority: usize,
186 future: F) -> Result<()>
187 where
188 F: Future<Output=O> + Send + 'static {
189 if !(self.0).0.load(Ordering::SeqCst) {
190 return Err(Error::new(ErrorKind::Other, "Spawn priority async task by id failed, reason: worker already closed"));
191 }
192
193 let result = (self.0).2.spawn_priority_by_id(task_id, priority, future);
194 wakeup_worker_thread(&(self.0).1, &(self.0).2);
195 result
196 }
197
198 fn spawn_yield_by_id<F>(&self, task_id: TaskId, future: F) -> Result<()>
200 where
201 F: Future<Output=O> + Send + 'static {
202 if !(self.0).0.load(Ordering::SeqCst) {
203 return Err(Error::new(ErrorKind::Other, "Spawn yield async task by id failed, reason: worker already closed"));
204 }
205
206 let result = (self.0).2.spawn_yield_by_id(task_id, future);
207 wakeup_worker_thread(&(self.0).1, &(self.0).2);
208 result
209 }
210
211 fn spawn_timing_by_id<F>(&self,
213 task_id: TaskId,
214 future: F,
215 time: usize) -> Result<()>
216 where
217 F: Future<Output=O> + Send + 'static {
218 if !(self.0).0.load(Ordering::SeqCst) {
219 return Err(Error::new(ErrorKind::Other, "Spawn timing async task by id failed, reason: worker already closed"));
220 }
221
222 let result = (self.0).2.spawn_timing_by_id(task_id, future, time);
223 wakeup_worker_thread(&(self.0).1, &(self.0).2);
224 result
225 }
226
227 #[inline]
229 fn pending<Output: 'static>(&self, task_id: &TaskId, waker: Waker) -> Poll<Output> {
230 (self.0).2.pending::<Output>(task_id, waker)
231 }
232
233 #[inline]
235 fn wakeup<Output: 'static>(&self, task_id: &TaskId) {
236 (self.0).2.wakeup::<Output>(task_id);
237 }
238
239 #[inline]
241 fn wait<V: Send + 'static>(&self) -> AsyncWait<V> {
242 (self.0).2.wait()
243 }
244
245 #[inline]
247 fn wait_any<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
248 (self.0).2.wait_any(capacity)
249 }
250
251 #[inline]
253 fn wait_any_callback<V: Send + 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
254 (self.0).2.wait_any_callback(capacity)
255 }
256
257 #[inline]
259 fn map_reduce<V: Send + 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
260 (self.0).2.map_reduce(capacity)
261 }
262
263 #[inline]
265 fn timeout(&self, timeout: usize) -> BoxFuture<'static, ()> {
266 (self.0).2.timeout(timeout)
267 }
268
269 #[inline]
271 fn yield_now(&self) -> BoxFuture<'static, ()> {
272 (self.0).2.yield_now()
273 }
274
275 #[inline]
277 fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> BoxStream<'static, FO>
278 where S: Stream<Item = SO> + Send + 'static,
279 SO: Send + 'static,
280 F: FnMut(SO) -> AsyncPipelineResult<FO> + Send + 'static,
281 FO: Send + 'static {
282 (self.0).2.pipeline(input, filter)
283 }
284
285 fn close(&self) -> bool {
287 if cfg!(target_arch = "aarch64") {
288 if let Ok(true) = (self.0).0.compare_exchange(true,
289 false,
290 Ordering::SeqCst,
291 Ordering::SeqCst) {
292 wakeup_worker_thread(&(self.0).1, &(self.0).2);
294 true
295 } else {
296 false
297 }
298 } else {
299 if let Ok(true) = (self.0).0.compare_exchange_weak(true,
300 false,
301 Ordering::SeqCst,
302 Ordering::SeqCst) {
303 wakeup_worker_thread(&(self.0).1, &(self.0).2);
305 true
306 } else {
307 false
308 }
309 }
310 }
311}
312
313impl<
314 O: Default + 'static,
315 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
316> AsyncRuntimeExt<O> for WorkerRuntime<O, P> {
317 #[inline]
318 fn spawn_with_context<F, C>(&self,
319 task_id: TaskId,
320 future: F,
321 context: C) -> Result<()>
322 where F: Future<Output = O> + Send + 'static,
323 C: 'static {
324 (self.0).2.spawn_with_context(task_id, future, context)
325 }
326
327 #[inline]
328 fn spawn_timing_with_context<F, C>(&self,
329 task_id: TaskId,
330 future: F,
331 context: C,
332 time: usize) -> Result<()>
333 where F: Future<Output = O> + Send + 'static,
334 C: Send + 'static {
335 (self.0).2.spawn_timing_with_context(task_id, future, context, time)
336 }
337
338 #[inline]
339 fn block_on<F>(&self, future: F) -> Result<F::Output>
340 where F: Future + Send + 'static,
341 <F as Future>::Output: Default + Send + 'static {
342 (self.0).2.block_on::<F>(future)
343 }
344}
345
346impl<
347 O: Default + 'static,
348 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
349> WorkerRuntime<O, P> {
350 pub fn get_worker_status(&self) -> &Arc<AtomicBool> {
352 &(self.0).0
353 }
354
355 pub fn get_worker_waker(&self) -> &Arc<(AtomicBool, Mutex<()>, Condvar)> {
357 &(self.0).1
358 }
359
360 pub fn get_worker_runtime(&self) -> &SingleTaskRuntime<O, P> {
362 &(self.0).2
363 }
364
365 pub fn to_local_runtime(&self) -> LocalAsyncRuntime<O> {
367 LocalAsyncRuntime {
368 inner: self.as_raw(),
369 get_id_func: WorkerRuntime::<O, P>::get_id_raw,
370 spawn_func: WorkerRuntime::<O, P>::spawn_raw,
371 spawn_timing_func: WorkerRuntime::<O, P>::spawn_timing_raw,
372 timeout_func: WorkerRuntime::<O, P>::timeout_raw,
373 }
374 }
375
376 #[inline]
378 pub(crate) fn as_raw(&self) -> *const () {
379 Arc::into_raw(self.0.clone()) as *const ()
380 }
381
382 #[inline]
384 pub(crate) fn from_raw(raw: *const ()) -> Self {
385 let inner = unsafe {
386 Arc::from_raw(raw as *const (
387 Arc<AtomicBool>,
388 Arc<(AtomicBool, Mutex<()>, Condvar)>,
389 SingleTaskRuntime<O, P>),
390 )
391 };
392 WorkerRuntime(inner)
393 }
394
395 pub(crate) fn get_id_raw(raw: *const ()) -> usize {
397 let rt = WorkerRuntime::<O, P>::from_raw(raw);
398 let id = rt.get_id();
399 Arc::into_raw(rt.0); id
401 }
402
403 pub(crate) fn spawn_raw(raw: *const (),
405 future: BoxFuture<'static, O>) -> Result<()> {
406 let rt = WorkerRuntime::<O, P>::from_raw(raw);
407 let result = rt.spawn_by_id(rt.alloc::<O>(), future);
408 Arc::into_raw(rt.0); result
410 }
411
412 pub(crate) fn spawn_timing_raw(raw: *const (),
414 future: BoxFuture<'static, O>,
415 timeout: usize) -> Result<()> {
416 let rt = WorkerRuntime::<O, P>::from_raw(raw);
417 let result = rt.spawn_timing_by_id(rt.alloc::<O>(), future, timeout);
418 Arc::into_raw(rt.0); result
420 }
421
422 pub(crate) fn timeout_raw(raw: *const (),
424 timeout: usize) -> BoxFuture<'static, ()> {
425 let rt = WorkerRuntime::<O, P>::from_raw(raw);
426 let boxed = rt.timeout(timeout);
427 Arc::into_raw(rt.0); boxed
429 }
430}
431
432pub struct WorkerTaskRunner<
436 O: Default + 'static = (),
437 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P> = SingleTaskPool<O>,
438>(Arc<(
439 Arc<AtomicBool>, Arc<(AtomicBool, Mutex<()>, Condvar)>, SingleTaskRunner<O, P>, WorkerRuntime<O, P>, )>);
444
445unsafe impl<
446 O: Default + 'static,
447 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
448> Send for WorkerTaskRunner<O, P> {}
449unsafe impl<
450 O: Default + 'static,
451 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
452> Sync for WorkerTaskRunner<O, P> {}
453
454impl<
455 O: Default + 'static,
456 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
457> Clone for WorkerTaskRunner<O, P> {
458 fn clone(&self) -> Self {
459 WorkerTaskRunner(self.0.clone())
460 }
461}
462
463impl<
464 O: Default + 'static,
465 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
466> From<(Arc<AtomicBool>, Arc<(AtomicBool, Mutex<()>, Condvar)>, SingleTaskRuntime<O, P>)> for WorkerRuntime<O, P> {
467 fn from(from: (Arc<AtomicBool>,
469 Arc<(AtomicBool, Mutex<()>, Condvar)>,
470 SingleTaskRuntime<O, P>,)) -> Self {
471 WorkerRuntime(Arc::new(from))
472 }
473}
474
475impl<O: Default + 'static> Default for WorkerTaskRunner<O> {
476 fn default() -> Self {
477 WorkerTaskRunner::new(SingleTaskPool::default(),
478 Arc::new(AtomicBool::new(true)),
479 Arc::new((AtomicBool::new(false), Mutex::new(()), Condvar::new())))
480 }
481}
482
483impl<
484 O: Default + 'static,
485 P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
486> WorkerTaskRunner<O, P> {
487 pub fn new(pool: P,
489 worker_status: Arc<AtomicBool>,
490 worker_waker: Arc<(AtomicBool, Mutex<()>, Condvar)>) -> Self {
491 let runner = SingleTaskRunner::new(pool);
492 let rt = runner.startup().unwrap();
493 let inner = (worker_status.clone(), worker_waker.clone(), rt);
494 let runtime = WorkerRuntime(Arc::new(inner));
495
496 let inner = (worker_status,
497 worker_waker,
498 runner,
499 runtime);
500
501 WorkerTaskRunner(Arc::new(inner))
502 }
503
504 pub fn get_runtime(&self) -> WorkerRuntime<O, P> {
506 (self.0).3.clone()
507 }
508
509 #[inline]
511 pub fn run_once(&self) -> Result<usize> {
512 (self.0).2.run_once()
513 }
514
515 #[inline]
517 pub fn run(&self) -> Result<usize> {
518 (self.0).2.run()
519 }
520
521 pub fn startup<LF, GQL>(self,
523 thread_name: &str,
524 thread_stack_size: usize,
525 sleep_timeout: u64,
526 loop_interval: Option<u64>,
527 loop_func: LF,
528 get_queue_len: GQL) -> WorkerRuntime<O, P>
529 where P: AsyncTaskPoolExt<O> + AsyncTaskPool<O, Pool = P>,
530 LF: Fn() -> (bool, Duration) + Send + 'static,
531 GQL: Fn() -> usize + Send + 'static{
532 let rt_copy = (self.0).3.clone();
533 let thread_handler = (self.0).0.clone();
534 let thread_waker = (self.0).1.clone();
535 spawn_worker_thread(
536 thread_name,
537 thread_stack_size,
538 thread_handler,
539 thread_waker,
540 sleep_timeout,
541 loop_interval,
542 loop_func,
543 move || {
544 rt_copy.wait_len() + get_queue_len()
545 },
546 );
547
548 (self.0).3.clone()
549 }
550}