datex_core/utils/
task_manager.rs1use crate::channel::mpsc::{UnboundedSender, create_unbounded_channel};
2use async_select::select;
3use core::{cell::RefCell, fmt::Debug, pin::Pin};
4use futures::future::Future;
5use futures_util::{StreamExt, stream::FuturesUnordered};
6
7use crate::prelude::*;
8pub type TaskFuture = Pin<Box<dyn Future<Output = ()>>>;
9
10pub struct TaskManager {
11 pub task_sender: RefCell<UnboundedSender<TaskFuture>>,
12}
13
14impl Debug for TaskManager {
15 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
16 f.debug_struct("TaskManager").finish()
17 }
18}
19
20impl TaskManager {
21 pub(crate) fn create() -> (TaskManager, impl Future<Output = ()>) {
23 let (sender, mut receiver) = create_unbounded_channel::<TaskFuture>();
24
25 (
26 TaskManager {
27 task_sender: RefCell::new(sender),
28 },
29 async move {
30 let mut tasks = FuturesUnordered::<TaskFuture>::new();
31
32 loop {
34 select! {
36 Some(_) = tasks.next() => {}
38
39 Some(new_fut) = receiver.next() => {
41 tasks.push(new_fut);
42 }
43 complete => unreachable!(),
44 }
45 }
46 },
47 )
48 }
49
50 pub(crate) fn register_task<F>(&self, fut: F)
52 where
53 F: Future<Output = ()> + 'static,
54 {
55 self.task_sender
56 .borrow_mut()
57 .start_send(Box::pin(fut))
58 .unwrap();
59 }
60}