hyperfile_reactor/
lib.rs

1use std::sync::Arc;
2use tokio::runtime::{Builder, Runtime};
3use tokio::sync::{mpsc, oneshot};
4use tokio::task::LocalSet;
5
6// based on example code:
7// https://docs.rs/tokio/latest/tokio/task/struct.LocalSet.html#use-inside-tokiospawn
8
9/// Handler return to user.
10///
11/// Use [`TaskHandler::send`] to submit task context into main handler loop.
12/// channel prio: cb > highprio > tx
13pub struct TaskHandler<T> {
14    tx: mpsc::UnboundedSender<T>,
15    highprio_tx: mpsc::UnboundedSender<T>,
16    cb_tx: mpsc::UnboundedSender<T>,
17}
18
19impl<T> Clone for TaskHandler<T> {
20    fn clone(&self) -> Self {
21        Self {
22            tx: self.tx.clone(),
23            highprio_tx: self.highprio_tx.clone(),
24            cb_tx: self.cb_tx.clone(),
25        }
26    }
27}
28
29impl<T> TaskHandler<T> {
30    pub fn send(&self, ctx: T) {
31        if let Err(e) = self.tx.send(ctx) {
32            panic!("failed to send context through mpsc channel, reason: {e}");
33        }
34    }
35
36    pub fn send_highprio(&self, ctx: T) {
37        if let Err(e) = self.highprio_tx.send(ctx) {
38            panic!("failed to send context through high prio mpsc channel, reason: {e}");
39        }
40    }
41
42    pub fn send_cb(&self, ctx: T) {
43        if let Err(e) = self.cb_tx.send(ctx) {
44            panic!("failed to send context through callback mpsc channel, reason: {e}");
45        }
46    }
47}
48
49pub trait Task<T: 'static> {
50
51    /// Handler need to be implement by user to process a single task.
52    ///
53    /// **Be careful:** since this handler running inside of a single thread,
54    /// never blocking it, all IO operations should be spawn out.
55    fn handler(&mut self, ctx: T) -> impl std::future::Future<Output = ()>;
56
57    /// Start main handler loop.
58    fn start(mut self) -> TaskHandler<T> where Self: Sized + 'static {
59        use futures_lite::future;
60        let (tx, mut rx) = mpsc::unbounded_channel::<T>();
61        let (highprio_tx, mut highprio_rx) = mpsc::unbounded_channel::<T>();
62        let (cb_tx, mut cb_rx) = mpsc::unbounded_channel::<T>();
63        tokio::task::spawn_local(async move {
64            while let Some(ctx) = future::or(cb_rx.recv(), future::or(highprio_rx.recv(), rx.recv())).await {
65                self.handler(ctx).await;
66            }
67        });
68        TaskHandler { tx: tx, highprio_tx: highprio_tx, cb_tx: cb_tx }
69    }
70}
71
72pub struct LocalSpawner<C, T> {
73   send: mpsc::UnboundedSender<(T, oneshot::Sender<TaskHandler<C>>)>,
74}
75
76impl<C, T> Clone for LocalSpawner<C, T> {
77    fn clone(&self) -> Self {
78        Self {
79            send: self.send.clone(),
80        }
81    }
82}
83
84impl<C: 'static + Send, T: Task<C> + 'static + Send> LocalSpawner<C, T> {
85    /// Create a new `LocalSpawner` instance use `[tokio::runtime::Builder::new_current_thread]`.
86    pub fn new_current() -> Self {
87        Self::new(None)
88    }
89
90    /// Create a new `LocalSpawner` instance use supplied `[tokio::runtime::Runtime]`.
91    pub fn new(runtime: Option<Arc<Runtime>>) -> Self {
92        let (send, mut recv) = mpsc::unbounded_channel::<(T, oneshot::Sender<TaskHandler<C>>)>();
93
94        let rt = if let Some(r) = runtime {
95            r.clone()
96        } else {
97            let r = Builder::new_current_thread()
98                .enable_all()
99                .build()
100                .unwrap();
101            Arc::new(r)
102        };
103
104        std::thread::spawn(move || {
105            let local = LocalSet::new();
106
107            local.spawn_local(async move {
108                while let Some((task, tx)) = recv.recv().await {
109                    let task_handle = task.start();
110                    // send task_handle to external
111                    let _ = tx.send(task_handle);
112                }
113                // If the while loop returns, then all the LocalSpawner
114                // objects have been dropped.
115            });
116
117            // This will return once all senders are dropped and all
118            // spawned tasks have returned.
119            rt.block_on(local);
120        });
121
122        Self {
123            send,
124        }
125    }
126
127    /// Spawn a task
128    ///
129    /// This will kick task to start it's main handler loop.
130    ///
131    /// User need to prepare a `[tokio::sync::oneshot]` channel to receive notification
132    /// when task successful started.
133    ///
134    /// # Example
135    ///
136    /// ```
137    /// let spawner = LocalSpawner::new_current();
138    /// let (tx, rx) = oneshot::channel();
139    /// let file = File::new(1);
140    /// spawner.spawn(file, tx);
141    /// let handler1 = rx.blocking_recv().expect("failed to get back file handler");
142    /// ```
143    pub fn spawn(&self, task: T, tx: oneshot::Sender<TaskHandler<C>>) {
144        self.send.send((task, tx)).expect("Thread with LocalSet has shut down.");
145    }
146}