tokio_tree_context/
lib.rs

1use std::{future::Future, time::Duration};
2use tokio::sync::broadcast::Sender;
3use tokio::{sync::broadcast, time::Instant};
4
5/// A context that can be used to spawn tokio tasks
6/// Cancelling the context (or dropping it) will cancel all async tasks spawn by this context
7/// You can create child context too.
8/// ```rust, no_run
9///    use std::time::Duration;
10///    use tokio_tree_context::Context;
11///    async fn testing() {
12///        let mut ctx = Context::new();
13///        
14///        let mut ctx1 = ctx.new_child_context();
15///        let mut ctx12 = ctx1.new_child_context();
16///        
17///        ctx.spawn(async move {
18///            sleep("ctx".into(), 100).await;
19///        });
20///        ctx1.spawn(async move {
21///            sleep("ctx1".into(), 100).await;
22///        });
23///        ctx12.spawn(async move {
24///            sleep("ctx12".into(), 100).await;
25///        });
26///        println!("Cancelling CTX 1");
27///        drop(ctx1);
28///        sleep("main".into(), 5).await;
29///        println!("Cancelling CTX 12");
30///        drop(ctx12);
31///        sleep("main".into(), 5).await;
32///        
33///        println!("Cancelling CTX");
34///        drop(ctx);
35///        
36///        sleep("main".into(), 5).await;
37///        
38///    }
39///        
40///    async fn sleep(name:String, what: u64) {
41///        for i in 0..what {
42///            println!("Task {} sleeping {} out of {} seconds", name, i + 1, what);
43///            tokio::time::sleep(Duration::from_secs(1)).await;
44///            println!("Task {} awake", name);
45///        }
46///    }
47/// ```
48pub struct Context {
49    cancel_sender: Sender<()>,
50}
51
52impl Context {
53    /// Cancel all tasks under this context
54    pub fn cancel(self) {}
55
56    /// Create a new context
57    pub fn new() -> Context {
58        let (tx, _) = broadcast::channel(1);
59        Context {
60            cancel_sender: tx,
61        }
62    }
63
64    /// Create a new Context from a parent. Same as `parent.new_child_context()`
65    pub fn with_parent(parent: &mut Context) -> Context {
66        return parent.new_child_context();
67    }
68
69    /// Create a new child context, where cancelling the parent context, will also cancel the child context.
70    /// Child context can have their own child context too.
71    /// 
72    /// The new context has a logical relationship with the parent. Cancelling parent will cancel child too.
73    pub fn new_child_context(&mut self) -> Context {
74        let (new_tx, _) = broadcast::channel(1);
75        let new_tx_clone = new_tx.clone();
76        let wsender= new_tx_clone.downgrade();
77        drop(new_tx_clone);
78        let mut rx = self.cancel_sender.subscribe();
79        tokio::spawn(async move {
80            let _ = rx.recv().await;
81            wsender.upgrade().map(|x| x.send(()))
82        });
83        Context {
84            cancel_sender: new_tx
85        }
86    }
87
88    /// Run a task with at timeout. If timeout is None, then no timeout is used
89    /// Task will run until:
90    ///     The task is completed
91    ///     The timeout is reached
92    ///     The context is cancelled
93    ///     Any of the parent/ancestor context is cancelled
94    /// 
95    /// Which ever is earlier.
96    /// For example
97    /// ```rust,no_run
98    /// use std::time::Duration;
99    /// use tokio_tree_context::Context;
100    /// 
101    /// let mut ctx = Context::new();
102    /// ctx.spawn_with_timeout(async move {
103    ///     // do your work here
104    /// }, Some(Duration::from_secs(3))); // task cancels after 3 seconds
105    /// // wait sometime
106    /// ctx.cancel();
107    /// ```
108    pub fn spawn_with_timeout<T>(&mut self, future: T, timeout: Option<Duration>) -> tokio::task::JoinHandle<Option<T::Output>>
109    where
110        T: Future + Send + 'static,
111        T::Output: Send + 'static,
112    {
113        let mut rx = self.cancel_sender.subscribe();
114        if let Some(duration) = timeout {
115            tokio::task::spawn(async move {
116                tokio::select! {
117                    res = future => Some(res),
118                    _ = rx.recv() => None,
119                    _ = tokio::time::sleep_until(Instant::now() + duration) => None,
120                }
121            })
122        } else {
123            tokio::task::spawn(async move {
124                tokio::select! {
125                    res = future => Some(res),
126                    _ = rx.recv() => None,
127                }
128            })
129        }
130    }
131
132    /// Spawn task without tiemout
133    /// Task is cancelled when you call this context's cancel or drop the context
134    /// 
135    /// For example
136    /// ```rust, no_run
137    /// use std::time::Duration;
138    /// use tokio_tree_context::Context;
139    /// 
140    /// let mut ctx = Context::new();
141    /// ctx.spawn(async move {
142    ///     // do your work here
143    /// });
144    /// // wait sometime
145    /// ctx.cancel();
146    /// ```
147    pub fn spawn<T>(&mut self, future: T) -> tokio::task::JoinHandle<Option<T::Output>>
148    where
149        T: Future + Send + 'static,
150        T::Output: Send + 'static,
151    {
152        self.spawn_with_timeout(future, None)
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159
160    #[tokio::test]
161    async fn it_works() {
162        let mut ctx = Context::new();
163
164        let mut ctx1 = ctx.new_child_context();
165        let mut ctx12 = ctx1.new_child_context();
166
167        ctx.spawn(async move {
168            sleep("ctx".into(), 100).await;
169        });
170        ctx1.spawn(async move {
171            sleep("ctx1".into(), 100).await;
172        });
173        ctx12.spawn(async move {
174            sleep("ctx12".into(), 100).await;
175        });
176        println!("Cancelling CTX 1");
177        drop(ctx1);
178        sleep("main".into(), 5).await;
179        println!("Cancelling CTX 12");
180        drop(ctx12);
181        sleep("main".into(), 5).await;
182
183        println!("Cancelling CTX");
184        drop(ctx);
185
186        sleep("main".into(), 5).await;
187
188    }
189
190    async fn sleep(name:String, what: u64) {
191        for i in 0..what {
192            println!("Task {} sleeping {} out of {} seconds", name, i + 1, what);
193            tokio::time::sleep(Duration::from_secs(1)).await;
194            println!("Task {} awake", name);
195        }
196    }
197}