Skip to main content

datex_core/utils/
task_manager.rs

1use crate::channel::mpsc::{UnboundedSender, create_unbounded_channel};
2use async_select::select;
3use core::{cell::RefCell, fmt::Debug, pin::Pin};
4use futures::future::Future;
5use futures_util::{StreamExt, stream::FuturesUnordered};
6
7use crate::prelude::*;
8pub type TaskFuture = Pin<Box<dyn Future<Output = ()>>>;
9
10pub struct TaskManager {
11    pub task_sender: RefCell<UnboundedSender<TaskFuture>>,
12}
13
14impl Debug for TaskManager {
15    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
16        f.debug_struct("TaskManager").finish()
17    }
18}
19
20impl TaskManager {
21    /// Async task to handle all events for the ComHub
22    pub(crate) fn create() -> (TaskManager, impl Future<Output = ()>) {
23        let (sender, mut receiver) = create_unbounded_channel::<TaskFuture>();
24
25        (
26            TaskManager {
27                task_sender: RefCell::new(sender),
28            },
29            async move {
30                let mut tasks = FuturesUnordered::<TaskFuture>::new();
31
32                // iterate over new_socket_iterators
33                loop {
34                    // check for new sockets from all iterators
35                    select! {
36                        // Poll for completed futures
37                        Some(_) = tasks.next() => {}
38
39                        // Poll for new futures from channel
40                        Some(new_fut) = receiver.next() => {
41                            tasks.push(new_fut);
42                        }
43                        complete => unreachable!(),
44                    }
45                }
46            },
47        )
48    }
49
50    /// Registers a new task on the ComHub
51    pub(crate) fn register_task<F>(&self, fut: F)
52    where
53        F: Future<Output = ()> + 'static,
54    {
55        self.task_sender
56            .borrow_mut()
57            .start_send(Box::pin(fut))
58            .unwrap();
59    }
60}