grafix_toolbox/kit/policies/
task.rs1pub mod pre {
2 pub use super::StreamExtKit;
3 pub use io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4 pub use tokio::{fs, io, sync::Notify};
5 pub use tokio_stream::{Stream, StreamExt};
6 pub mod stream {
7 pub use futures_lite::stream::{Boxed, once_future as once, unfold};
8 pub use tokio_stream::iter;
9 }
10 pub use futures_lite::future::FutureExt;
11 pub type BoxFut<'s, T> = std::pin::Pin<Box<dyn Future<Output = T> + Send + 's>>;
12}
13
14#[allow(async_fn_in_trait)]
15pub trait StreamExtKit: Stream {
16 async fn count(self) -> usize;
17 async fn for_each(self, f: impl FnMut(Self::Item));
18}
19impl<S: StreamExt> StreamExtKit for S {
20 async fn count(self) -> usize {
21 self.fold(0, |s, _| s + 1).await
22 }
23 async fn for_each(self, mut f: impl FnMut(Self::Item)) {
24 self.fold((), |_, i| f(i)).await
25 }
26}
27
28pub fn GLRuntime() -> &'static RT {
29 InitGLRuntime::<WindowImpl>(None)
30}
31pub(in crate::kit) fn InitGLRuntime<W: Window>(w: Option<&mut W>) -> &'static RT {
32 static S: OnceLock<RT> = OnceLock::new();
33 S.get_or_init(|| {
34 let w = w.unwrap_or_else(|| ERROR!("GLRuntime() accessed before Window()"));
35 let init = w.gl_ctx_maker();
36 rt::Builder::new_multi_thread()
37 .worker_threads(1)
38 .max_blocking_threads(1)
39 .enable_time()
40 .build()
41 .explain_err(|| "Cannot create gl runtime")
42 .fail()
43 .tap(|s| {
44 s.spawn(async { Box(init()).pipe(Box::leak) });
45 })
46 .pipe(RT)
47 })
48}
49
50pub fn Runtime() -> &'static RT {
51 static S: OnceLock<RT> = OnceLock::new();
52 S.get_or_init(|| {
53 rt::Builder::new_multi_thread()
54 .worker_threads(3)
55 .max_blocking_threads(2)
56 .enable_time()
57 .build()
58 .expect("E| Cannot create async runtime")
59 .pipe(RT)
60 })
61}
62impl RT {
63 pub fn spawn<T: SendS, F: Fut<T>>(&self, f: impl FnOnce() -> F + SendS) -> Task<T> {
64 self.0.spawn(async { f().await }).pipe(Some).pipe(Task)
65 }
66 pub fn finish<T: SendS>(&self, mut t: Task<T>) -> T {
67 self.finish_ref(&mut t)
68 }
69 pub fn cancel<T: SendS>(&self, mut t: Task<T>) {
70 t.0.take().map(|h| {
71 h.abort();
72 let _ = self.block_on(h);
73 });
74 }
75 pub fn finish_ref<T: SendS>(&self, t: &mut Task<T>) -> T {
76 let t = t.0.take().valid();
77 self.block_on(t).valid()
78 }
79 fn block_on<T: SendS>(&self, h: JoinHandle<T>) -> Res<T> {
80 if let Ok(r) = rt::Handle::try_current() {
81 WARN!("Blocking code polluted async");
82 let (sn, rx) = oneshot::channel();
83 r.spawn(async { sn.send(h.await) });
84 task::block_in_place(|| rx.blocking_recv().valid()).res()
85 } else {
86 self.0.block_on(h).res()
87 }
88 }
89}
90pub struct RT(rt::Runtime);
91
92pub struct Task<T>(Option<JoinHandle<T>>);
93impl<T> Task<T> {
94 pub fn new_uninit() -> Self {
95 Self(None)
96 }
97 pub fn detach(mut self) {
98 let _ = self.0.take();
99 }
100 pub fn is_ready(&self) -> bool {
101 self.0.as_ref().is_some_and(|s| s.is_finished())
102 }
103}
104impl<T> Drop for Task<T> {
105 fn drop(&mut self) {
106 self.0.take().map(|h| h.abort());
107 }
108}
109impl<T> Debug for Task<T> {
110 fn fmt(&self, f: &mut Formatter) -> fmtRes {
111 let h = &self.0.as_ref().map(|h| h.id());
112 f.debug_tuple("Task").field(h).field(&type_name::<T>()).finish()
113 }
114}
115
116use tokio::{runtime as rt, sync::oneshot, task::JoinHandle};
117use {crate::lib::*, GL::window::*};