async_flow/tokio/scheduler/
serial.rs1use crate::{Channel, Inputs, Outputs, Scheduler};
4use tokio::{
5 runtime::{Builder, Runtime},
6 task::LocalSet,
7};
8
9#[derive(Debug)]
10pub struct SerialScheduler {
11 tasks: Option<LocalSet>,
12 runtime: Runtime,
13}
14
15impl Scheduler for SerialScheduler {}
16
17impl SerialScheduler {
18 pub fn new() -> std::io::Result<Self> {
19 let runtime = Builder::new_current_thread().enable_all().build()?;
20 let tasks = Some(LocalSet::new());
21 Ok(Self { tasks, runtime })
22 }
23
24 #[cfg(feature = "tokio")]
25 pub fn id(&self) -> Option<tokio::runtime::Id> {
26 self.tasks.as_ref().map(|tasks| tasks.id())
27 }
28
29 pub fn spawn<F>(&self, process: F)
30 where
31 F: Future + 'static,
32 F::Output: 'static,
33 {
34 let _ = self.tasks.as_ref().unwrap().spawn_local(process);
35 }
36
37 pub fn create<T, F>(&self, block: F) -> (Outputs<T>, Inputs<T>)
38 where
39 F: AsyncFn(Inputs<T>, Outputs<T>) -> Result<(), crate::Error>,
40 F: Copy + 'static,
41 T: 'static,
42 {
43 let (input, output) = Channel::<T>::pair();
44 let _ = self
45 .tasks
46 .as_ref()
47 .unwrap()
48 .spawn_local(async move { block(input.rx, output.tx).await });
49 (input.tx, output.rx)
50 }
51
52 pub async fn run(&mut self) {
53 match self.tasks.take() {
54 None => (),
55 Some(tasks) => tasks.await,
56 }
57 }
58}
59
60#[cfg(feature = "tokio")]
61impl AsRef<tokio::runtime::Runtime> for SerialScheduler {
62 fn as_ref(&self) -> &tokio::runtime::Runtime {
63 &self.runtime
64 }
65}
66
67#[cfg(feature = "tokio")]
68impl AsRef<tokio::runtime::Handle> for SerialScheduler {
69 fn as_ref(&self) -> &tokio::runtime::Handle {
70 self.runtime.handle()
71 }
72}
73
74#[cfg(feature = "tokio")]
75impl AsRef<tokio::task::LocalSet> for SerialScheduler {
76 fn as_ref(&self) -> &tokio::task::LocalSet {
77 &self.tasks.as_ref().unwrap()
78 }
79}