1use std::thread;
18use std::task::Context;
19use std::future::Future;
20use std::cell::UnsafeCell;
21use std::task::Poll::Pending;
22use std::collections::VecDeque;
23use std::io::Result as IOResult;
24use std::sync::{
25 atomic::{AtomicBool, Ordering},
26 Arc,
27};
28use std::sync::atomic::AtomicUsize;
29use std::time::{Duration, Instant};
30
31use async_stream::stream;
32use crossbeam_queue::SegQueue;
33use flume::bounded as async_bounded;
34use futures::{
35 future::{FutureExt, LocalBoxFuture},
36 stream::{LocalBoxStream, Stream, StreamExt},
37 task::{waker_ref, ArcWake},
38};
39#[cfg(not(target_arch = "wasm32"))]
40use polling::{Events, Poller};
41
42use crate::{
43 rt::{
44 alloc_rt_uid,
45 serial::{AsyncMapReduce, AsyncWait, AsyncWaitAny, AsyncWaitAnyCallback},
46 AsyncPipelineResult, YieldNow
47 },
48};
49
50pub(crate) struct LocalTask<O: Default + 'static = ()> {
52 inner: UnsafeCell<Option<LocalBoxFuture<'static, O>>>, runtime: LocalTaskRuntime<O>, }
55
56unsafe impl<O: Default + 'static> Send for LocalTask<O> {}
57unsafe impl<O: Default + 'static> Sync for LocalTask<O> {}
58
59impl<O: Default + 'static> ArcWake for LocalTask<O> {
60 fn wake_by_ref(arc_self: &Arc<Self>) {
61 arc_self.runtime.will_wakeup(arc_self.clone());
62 }
63}
64
65impl<O: Default + 'static> LocalTask<O> {
66 pub fn get_inner(&self) -> Option<LocalBoxFuture<'static, O>> {
68 unsafe { (&mut *self.inner.get()).take() }
69 }
70
71 pub fn set_inner(&self, inner: Option<LocalBoxFuture<'static, O>>) {
73 unsafe {
74 *self.inner.get() = inner;
75 }
76 }
77}
78
79#[cfg(not(target_arch = "wasm32"))]
83pub struct LocalTaskRuntime<O: Default + 'static = ()>(
84 Arc<(
85 usize, Arc<AtomicBool>, SegQueue<Arc<LocalTask<O>>>, UnsafeCell<VecDeque<Arc<LocalTask<O>>>>, Option<AtomicBool>, Option<Arc<Poller>>, )>,
92);
93#[cfg(target_arch = "wasm32")]
97pub struct LocalTaskRuntime<O: Default + 'static = ()>(
98 Arc<(
99 usize, Arc<AtomicBool>, SegQueue<Arc<LocalTask<O>>>, UnsafeCell<VecDeque<Arc<LocalTask<O>>>>, Option<AtomicBool>, )>,
105);
106
107unsafe impl<O: Default + 'static> Send for LocalTaskRuntime<O> {}
108impl<O: Default + 'static> !Sync for LocalTaskRuntime<O> {}
109
110impl<O: Default + 'static> Clone for LocalTaskRuntime<O> {
111 fn clone(&self) -> Self {
112 LocalTaskRuntime(self.0.clone())
113 }
114}
115
116impl<O: Default + 'static> LocalTaskRuntime<O> {
117 #[inline]
119 pub fn is_running(&self) -> bool {
120 self.0.1.load(Ordering::Relaxed)
121 }
122
123 pub fn get_id(&self) -> usize {
125 self.0.0
126 }
127
128 pub fn len(&self) -> usize {
130 unsafe {
131 (self.0).2.len() + self.internal_len()
132 }
133 }
134
135 #[inline]
137 pub(crate) fn internal_len(&self) -> usize {
138 unsafe {
139 (&*self.0.3.get()).len()
140 }
141 }
142
143 pub fn spawn<F>(&self, future: F)
145 where
146 F: Future<Output = O> + 'static,
147 {
148 unsafe {
149 (&mut *(self.0).3.get()).push_back(Arc::new(LocalTask {
150 inner: UnsafeCell::new(Some(future.boxed_local())),
151 runtime: self.clone(),
152 }));
153 }
154 }
155
156 #[inline]
158 pub(crate) fn will_wakeup(&self, task: Arc<LocalTask<O>>) {
159 self.0.2.push(task);
160 }
161
162 pub fn send<F>(&self, future: F)
164 where
165 F: Future<Output = O> + 'static,
166 {
167 self.0.2.push(Arc::new(LocalTask {
168 inner: UnsafeCell::new(Some(future.boxed_local())),
169 runtime: self.clone(),
170 }));
171
172 #[cfg(not(target_arch = "wasm32"))]
173 if let Some(sleeping) = &self.0.4 {
174 if sleeping.compare_exchange(
175 false,
176 true,
177 Ordering::AcqRel,
178 Ordering::Relaxed).is_ok()
179 {
180 let _ = self
182 .0
183 .5
184 .as_ref()
185 .unwrap()
186 .notify();
187 }
188 }
189 }
190
191 #[inline]
193 pub fn poll(&self) {
194 let internal = unsafe { &mut * (self.0).3.get() };
195 while let Some(task) = (self.0).2.pop() {
196 internal.push_back(task);
197 }
198 }
199
200 pub fn wait<V: 'static>(&self) -> AsyncWait<V> {
202 AsyncWait::new(self.wait_any(2))
203 }
204
205 pub fn wait_any<V: 'static>(&self, capacity: usize) -> AsyncWaitAny<V> {
207 let (producor, consumer) = async_bounded(capacity);
208
209 AsyncWaitAny::new(capacity, producor, consumer)
210 }
211
212 pub fn wait_any_callback<V: 'static>(&self, capacity: usize) -> AsyncWaitAnyCallback<V> {
214 let (producor, consumer) = async_bounded(capacity);
215
216 AsyncWaitAnyCallback::new(capacity, producor, consumer)
217 }
218
219 pub fn map_reduce<V: 'static>(&self, capacity: usize) -> AsyncMapReduce<V> {
221 let (producor, consumer) = async_bounded(capacity);
222
223 AsyncMapReduce::new(0, capacity, producor, consumer)
224 }
225
226 pub fn yield_now(&self) -> LocalBoxFuture<'static, ()> {
228 async move {
229 YieldNow(false).await;
230 }.boxed_local()
231 }
232
233 pub fn pipeline<S, SO, F, FO>(&self, input: S, mut filter: F) -> LocalBoxStream<'static, FO>
235 where
236 S: Stream<Item = SO> + 'static,
237 SO: 'static,
238 F: FnMut(SO) -> AsyncPipelineResult<FO> + 'static,
239 FO: 'static,
240 {
241 let output = stream! {
242 for await value in input {
243 match filter(value) {
244 AsyncPipelineResult::Disconnect => {
245 break;
247 },
248 AsyncPipelineResult::Filtered(result) => {
249 yield result;
250 },
251 }
252 }
253 };
254
255 output.boxed_local()
256 }
257
258 pub fn block_on<F>(&self, future: F) -> IOResult<F::Output>
260 where
261 F: Future + 'static,
262 <F as Future>::Output: Default + 'static,
263 {
264 let runner = LocalTaskRunner(self.clone());
265 let mut result: Option<<F as Future>::Output> = None;
266 let result_ptr = (&mut result) as *mut Option<<F as Future>::Output>;
267
268 self.spawn(async move {
269 let r = future.await;
271 unsafe {
272 *result_ptr = Some(r);
273 }
274
275 Default::default()
276 });
277
278 loop {
279 while self.internal_len() > 0 {
281 runner.run_once();
282 }
283
284 if let Some(result) = result.take() {
286 return Ok(result);
288 }
289 }
290 }
291
292 pub fn close(self) -> bool {
294 if cfg!(target_arch = "aarch64") {
295 if let Ok(true) =
296 (self.0)
297 .1
298 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
299 {
300 true
302 } else {
303 false
304 }
305 } else {
306 if let Ok(true) =
307 (self.0)
308 .1
309 .compare_exchange_weak(true, false, Ordering::SeqCst, Ordering::SeqCst)
310 {
311 true
313 } else {
314 false
315 }
316 }
317 }
318}
319
320pub struct LocalTaskRunner<O: Default + 'static = ()>(LocalTaskRuntime<O>);
324
325unsafe impl<O: Default + 'static> Send for LocalTaskRunner<O> {}
326impl<O: Default + 'static> !Sync for LocalTaskRunner<O> {}
327
328impl<O: Default + 'static> LocalTaskRunner<O> {
329 pub fn new() -> Self {
331 #[cfg(not(target_arch = "wasm32"))]
332 let inner = (
333 alloc_rt_uid(),
334 Arc::new(AtomicBool::new(false)),
335 SegQueue::new(),
336 UnsafeCell::new(VecDeque::new()),
337 None,
338 None,
339 );
340 #[cfg(target_arch = "wasm32")]
341 let inner = (
342 crate::rt::alloc_rt_uid(),
343 Arc::new(AtomicBool::new(false)),
344 SegQueue::new(),
345 UnsafeCell::new(VecDeque::new()),
346 None,
347 );
348
349 LocalTaskRunner(LocalTaskRuntime(Arc::new(inner)))
350 }
351
352 #[cfg(not(target_arch = "wasm32"))]
355 pub fn with_poll(poller: Arc<Poller>) -> Self {
356 let inner = (
357 alloc_rt_uid(),
358 Arc::new(AtomicBool::new(false)),
359 SegQueue::new(),
360 UnsafeCell::new(VecDeque::new()),
361 Some(AtomicBool::new(false)),
362 Some(poller),
363 );
364
365 LocalTaskRunner(LocalTaskRuntime(Arc::new(inner)))
366 }
367
368 #[cfg(not(target_arch = "wasm32"))]
370 #[inline(always)]
371 pub fn is_with_polling(&self) -> bool {
372 self.0.0.4.is_some()
373 && self.0.0.5.is_some()
374 }
375
376 pub fn get_runtime(&self) -> LocalTaskRuntime<O> {
378 self.0.clone()
379 }
380
381 pub fn startup(
383 self,
384 thread_name: &str,
385 thread_stack_size: usize
386 ) -> LocalTaskRuntime<O> {
387 let rt = self.get_runtime();
388 let rt_copy = rt.clone();
389 let _ = thread::Builder::new()
390 .name(thread_name.to_string())
391 .stack_size(thread_stack_size)
392 .spawn(move || {
393 (rt_copy.0).1.store(true, Ordering::Relaxed);
394
395 while rt_copy.is_running() {
396 self.poll();
397 self.run_once();
398 }
399 });
400
401 rt
402 }
403
404 #[cfg(not(target_arch = "wasm32"))]
406 pub fn startup_with_poll(
407 self,
408 thread_name: &str,
409 thread_stack_size: usize,
410 try_count: usize,
411 timeout: Option<Duration>,
412 ) -> LocalTaskRuntime<O> {
413 let rt = self.get_runtime();
414 let rt_copy = rt.clone();
415 let _ = thread::Builder::new()
416 .name(thread_name.to_string())
417 .stack_size(thread_stack_size)
418 .spawn(move || {
419 (rt_copy.0).1.store(true, Ordering::Relaxed);
420
421 let mut count = try_count;
422 while rt_copy.is_running() {
423 self.poll();
424 self.run_once();
425 match self.try_sleep(count, timeout) {
426 Err(e) => {
427 rt_copy.0.1.store(false, Ordering::Release);
428 panic!("Run runtime failed, reason: {:?}", e);
429 },
430 Ok(Some(new_count)) => {
431 count = new_count;
432 continue;
433 },
434 Ok(None) => {
435 count = try_count;
436 continue;
437 },
438 }
439 }
440 });
441
442 rt
443 }
444
445 #[inline]
448 pub fn poll(&self) {
449 while let Some(task) = ((self.0).0).2.pop() {
450 unsafe {
451 (&mut *((self.0).0).3.get()).push_back(task);
452 }
453 }
454 }
455
456 #[inline]
458 pub fn run_once(&self) {
459 unsafe {
460 if let Some(task) = (&mut *(&(self.0).0).3.get()).pop_front() {
461 let waker = waker_ref(&task);
462 let mut context = Context::from_waker(&*waker);
463 if let Some(mut future) = task.get_inner() {
464 if let Pending = future.as_mut().poll(&mut context) {
465 task.set_inner(Some(future));
467 }
468 }
469 }
470 }
471 }
472
473 #[cfg(not(target_arch = "wasm32"))]
477 #[inline]
478 pub fn try_sleep(
479 &self,
480 try_count: usize,
481 timeout: Option<Duration>
482 ) -> IOResult<Option<usize>> {
483 if !self.is_with_polling() {
484 return Ok(Some(try_count));
485 }
486
487 if self.0.len() != 0 {
488 return Ok(Some(try_count));
490 }
491
492 if try_count != 0 {
493 return Ok(Some(try_count - 1));
495 }
496
497 let mut events = Events::with_capacity(std::num::NonZeroUsize::new(1).unwrap());
498 let _ = self
499 .0
500 .0
501 .5
502 .as_ref()
503 .unwrap()
504 .wait(&mut events, timeout)?;
505 self
506 .0
507 .0
508 .4
509 .as_ref()
510 .unwrap()
511 .store(false, Ordering::Release);
512
513 return Ok(None);
514 }
515
516 pub fn into_local(self) -> LocalTaskRuntime<O> {
518 self.0
519 }
520}
521
522#[test]
523fn test_local_runtime_block_on() {
524 struct AtomicCounter(AtomicUsize, Instant);
525 impl Drop for AtomicCounter {
526 fn drop(&mut self) {
527 {
528 println!(
529 "!!!!!!drop counter, count: {:?}, time: {:?}",
530 self.0.load(Ordering::Relaxed),
531 Instant::now() - self.1
532 );
533 }
534 }
535 }
536
537 let rt = LocalTaskRunner::<()>::new().into_local();
538
539 let counter = Arc::new(AtomicCounter(AtomicUsize::new(0), Instant::now()));
540 for _ in 0..10000000 {
541 let counter_copy = counter.clone();
542 let _ = rt.block_on(async move { counter_copy.0.fetch_add(1, Ordering::Relaxed) });
543 }
544}