Skip to main content

grafix_toolbox/kit/policies/
task.rs

1pub mod pre {
2	pub use super::StreamExtKit;
3	pub use io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
4	pub use tokio::{fs, io, sync::Notify};
5	pub use tokio_stream::{Stream, StreamExt};
6	pub mod stream {
7		pub use futures_lite::stream::{Boxed, once_future as once, unfold};
8		pub use tokio_stream::iter;
9	}
10	pub use futures_lite::future::FutureExt;
11	pub type BoxFut<'s, T> = std::pin::Pin<Box<dyn Future<Output = T> + Send + 's>>;
12}
13
14#[allow(async_fn_in_trait)]
15pub trait StreamExtKit: Stream {
16	async fn count(self) -> usize;
17	async fn for_each(self, f: impl FnMut(Self::Item));
18}
19impl<S: StreamExt> StreamExtKit for S {
20	async fn count(self) -> usize {
21		self.fold(0, |s, _| s + 1).await
22	}
23	async fn for_each(self, mut f: impl FnMut(Self::Item)) {
24		self.fold((), |_, i| f(i)).await
25	}
26}
27
28pub fn GLRuntime() -> &'static RT {
29	InitGLRuntime::<WindowImpl>(None)
30}
31pub(in crate::kit) fn InitGLRuntime<W: Window>(w: Option<&mut W>) -> &'static RT {
32	static S: OnceLock<RT> = OnceLock::new();
33	S.get_or_init(|| {
34		let w = w.unwrap_or_else(|| ERROR!("GLRuntime() accessed before Window()"));
35		let init = w.gl_ctx_maker();
36		rt::Builder::new_multi_thread()
37			.worker_threads(1)
38			.max_blocking_threads(1)
39			.enable_time()
40			.build()
41			.explain_err(|| "Cannot create gl runtime")
42			.fail()
43			.tap(|s| {
44				s.spawn(async { Box(init()).pipe(Box::leak) });
45			})
46			.pipe(RT)
47	})
48}
49
50pub fn Runtime() -> &'static RT {
51	static S: OnceLock<RT> = OnceLock::new();
52	S.get_or_init(|| {
53		rt::Builder::new_multi_thread()
54			.worker_threads(3)
55			.max_blocking_threads(2)
56			.enable_time()
57			.build()
58			.expect("E| Cannot create async runtime")
59			.pipe(RT)
60	})
61}
62impl RT {
63	pub fn spawn<T: SendS, F: Fut<T>>(&self, f: impl FnOnce() -> F + SendS) -> Task<T> {
64		self.0.spawn(async { f().await }).pipe(Some).pipe(Task)
65	}
66	pub fn finish<T: SendS>(&self, mut t: Task<T>) -> T {
67		self.finish_ref(&mut t)
68	}
69	pub fn cancel<T: SendS>(&self, mut t: Task<T>) {
70		t.0.take().map(|h| {
71			h.abort();
72			let _ = self.block_on(h);
73		});
74	}
75	pub fn finish_ref<T: SendS>(&self, t: &mut Task<T>) -> T {
76		let t = t.0.take().valid();
77		self.block_on(t).valid()
78	}
79	fn block_on<T: SendS>(&self, h: JoinHandle<T>) -> Res<T> {
80		if let Ok(r) = rt::Handle::try_current() {
81			WARN!("Blocking code polluted async");
82			let (sn, rx) = oneshot::channel();
83			r.spawn(async { sn.send(h.await) });
84			task::block_in_place(|| rx.blocking_recv().valid()).res()
85		} else {
86			self.0.block_on(h).res()
87		}
88	}
89}
90pub struct RT(rt::Runtime);
91
92pub struct Task<T>(Option<JoinHandle<T>>);
93impl<T> Task<T> {
94	pub fn new_uninit() -> Self {
95		Self(None)
96	}
97	pub fn detach(mut self) {
98		let _ = self.0.take();
99	}
100	pub fn is_ready(&self) -> bool {
101		self.0.as_ref().is_some_and(|s| s.is_finished())
102	}
103}
104impl<T> Drop for Task<T> {
105	fn drop(&mut self) {
106		self.0.take().map(|h| h.abort());
107	}
108}
109impl<T> Debug for Task<T> {
110	fn fmt(&self, f: &mut Formatter) -> fmtRes {
111		let h = &self.0.as_ref().map(|h| h.id());
112		f.debug_tuple("Task").field(h).field(&type_name::<T>()).finish()
113	}
114}
115
116use tokio::{runtime as rt, sync::oneshot, task::JoinHandle};
117use {crate::lib::*, GL::window::*};