Skip to main content

async_flow/tokio/scheduler/
serial.rs

1// This is free and unencumbered software released into the public domain.
2
3use crate::{Channel, Inputs, Outputs, Scheduler};
4use tokio::{
5    runtime::{Builder, Runtime},
6    task::LocalSet,
7};
8
9#[derive(Debug)]
10pub struct SerialScheduler {
11    tasks: Option<LocalSet>,
12    runtime: Runtime,
13}
14
15impl Scheduler for SerialScheduler {}
16
17impl SerialScheduler {
18    pub fn new() -> std::io::Result<Self> {
19        let runtime = Builder::new_current_thread().enable_all().build()?;
20        let tasks = Some(LocalSet::new());
21        Ok(Self { tasks, runtime })
22    }
23
24    #[cfg(feature = "tokio")]
25    pub fn id(&self) -> Option<tokio::runtime::Id> {
26        self.tasks.as_ref().map(|tasks| tasks.id())
27    }
28
29    pub fn spawn<F>(&self, process: F)
30    where
31        F: Future + 'static,
32        F::Output: 'static,
33    {
34        let _ = self.tasks.as_ref().unwrap().spawn_local(process);
35    }
36
37    pub fn create<T, F>(&self, block: F) -> (Outputs<T>, Inputs<T>)
38    where
39        F: AsyncFn(Inputs<T>, Outputs<T>) -> Result<(), crate::Error>,
40        F: Copy + 'static,
41        T: 'static,
42    {
43        let (input, output) = Channel::<T>::pair();
44        let _ = self
45            .tasks
46            .as_ref()
47            .unwrap()
48            .spawn_local(async move { block(input.rx, output.tx).await });
49        (input.tx, output.rx)
50    }
51
52    pub async fn run(&mut self) {
53        match self.tasks.take() {
54            None => (),
55            Some(tasks) => tasks.await,
56        }
57    }
58}
59
60#[cfg(feature = "tokio")]
61impl AsRef<tokio::runtime::Runtime> for SerialScheduler {
62    fn as_ref(&self) -> &tokio::runtime::Runtime {
63        &self.runtime
64    }
65}
66
67#[cfg(feature = "tokio")]
68impl AsRef<tokio::runtime::Handle> for SerialScheduler {
69    fn as_ref(&self) -> &tokio::runtime::Handle {
70        self.runtime.handle()
71    }
72}
73
74#[cfg(feature = "tokio")]
75impl AsRef<tokio::task::LocalSet> for SerialScheduler {
76    fn as_ref(&self) -> &tokio::task::LocalSet {
77        &self.tasks.as_ref().unwrap()
78    }
79}