tokio_tree_context/lib.rs
1use std::{future::Future, time::Duration};
2use tokio::sync::broadcast::Sender;
3use tokio::{sync::broadcast, time::Instant};
4
5/// A context that can be used to spawn tokio tasks
6/// Cancelling the context (or dropping it) will cancel all async tasks spawn by this context
7/// You can create child context too.
8/// ```rust, no_run
9/// use std::time::Duration;
10/// use tokio_tree_context::Context;
11/// async fn testing() {
12/// let mut ctx = Context::new();
13///
14/// let mut ctx1 = ctx.new_child_context();
15/// let mut ctx12 = ctx1.new_child_context();
16///
17/// ctx.spawn(async move {
18/// sleep("ctx".into(), 100).await;
19/// });
20/// ctx1.spawn(async move {
21/// sleep("ctx1".into(), 100).await;
22/// });
23/// ctx12.spawn(async move {
24/// sleep("ctx12".into(), 100).await;
25/// });
26/// println!("Cancelling CTX 1");
27/// drop(ctx1);
28/// sleep("main".into(), 5).await;
29/// println!("Cancelling CTX 12");
30/// drop(ctx12);
31/// sleep("main".into(), 5).await;
32///
33/// println!("Cancelling CTX");
34/// drop(ctx);
35///
36/// sleep("main".into(), 5).await;
37///
38/// }
39///
40/// async fn sleep(name:String, what: u64) {
41/// for i in 0..what {
42/// println!("Task {} sleeping {} out of {} seconds", name, i + 1, what);
43/// tokio::time::sleep(Duration::from_secs(1)).await;
44/// println!("Task {} awake", name);
45/// }
46/// }
47/// ```
48pub struct Context {
49 cancel_sender: Sender<()>,
50}
51
52impl Context {
53 /// Cancel all tasks under this context
54 pub fn cancel(self) {}
55
56 /// Create a new context
57 pub fn new() -> Context {
58 let (tx, _) = broadcast::channel(1);
59 Context {
60 cancel_sender: tx,
61 }
62 }
63
64 /// Create a new Context from a parent. Same as `parent.new_child_context()`
65 pub fn with_parent(parent: &mut Context) -> Context {
66 return parent.new_child_context();
67 }
68
69 /// Create a new child context, where cancelling the parent context, will also cancel the child context.
70 /// Child context can have their own child context too.
71 ///
72 /// The new context has a logical relationship with the parent. Cancelling parent will cancel child too.
73 pub fn new_child_context(&mut self) -> Context {
74 let (new_tx, _) = broadcast::channel(1);
75 let new_tx_clone = new_tx.clone();
76 let wsender= new_tx_clone.downgrade();
77 drop(new_tx_clone);
78 let mut rx = self.cancel_sender.subscribe();
79 tokio::spawn(async move {
80 let _ = rx.recv().await;
81 wsender.upgrade().map(|x| x.send(()))
82 });
83 Context {
84 cancel_sender: new_tx
85 }
86 }
87
88 /// Run a task with at timeout. If timeout is None, then no timeout is used
89 /// Task will run until:
90 /// The task is completed
91 /// The timeout is reached
92 /// The context is cancelled
93 /// Any of the parent/ancestor context is cancelled
94 ///
95 /// Which ever is earlier.
96 /// For example
97 /// ```rust,no_run
98 /// use std::time::Duration;
99 /// use tokio_tree_context::Context;
100 ///
101 /// let mut ctx = Context::new();
102 /// ctx.spawn_with_timeout(async move {
103 /// // do your work here
104 /// }, Some(Duration::from_secs(3))); // task cancels after 3 seconds
105 /// // wait sometime
106 /// ctx.cancel();
107 /// ```
108 pub fn spawn_with_timeout<T>(&mut self, future: T, timeout: Option<Duration>) -> tokio::task::JoinHandle<Option<T::Output>>
109 where
110 T: Future + Send + 'static,
111 T::Output: Send + 'static,
112 {
113 let mut rx = self.cancel_sender.subscribe();
114 if let Some(duration) = timeout {
115 tokio::task::spawn(async move {
116 tokio::select! {
117 res = future => Some(res),
118 _ = rx.recv() => None,
119 _ = tokio::time::sleep_until(Instant::now() + duration) => None,
120 }
121 })
122 } else {
123 tokio::task::spawn(async move {
124 tokio::select! {
125 res = future => Some(res),
126 _ = rx.recv() => None,
127 }
128 })
129 }
130 }
131
132 /// Spawn task without tiemout
133 /// Task is cancelled when you call this context's cancel or drop the context
134 ///
135 /// For example
136 /// ```rust, no_run
137 /// use std::time::Duration;
138 /// use tokio_tree_context::Context;
139 ///
140 /// let mut ctx = Context::new();
141 /// ctx.spawn(async move {
142 /// // do your work here
143 /// });
144 /// // wait sometime
145 /// ctx.cancel();
146 /// ```
147 pub fn spawn<T>(&mut self, future: T) -> tokio::task::JoinHandle<Option<T::Output>>
148 where
149 T: Future + Send + 'static,
150 T::Output: Send + 'static,
151 {
152 self.spawn_with_timeout(future, None)
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159
160 #[tokio::test]
161 async fn it_works() {
162 let mut ctx = Context::new();
163
164 let mut ctx1 = ctx.new_child_context();
165 let mut ctx12 = ctx1.new_child_context();
166
167 ctx.spawn(async move {
168 sleep("ctx".into(), 100).await;
169 });
170 ctx1.spawn(async move {
171 sleep("ctx1".into(), 100).await;
172 });
173 ctx12.spawn(async move {
174 sleep("ctx12".into(), 100).await;
175 });
176 println!("Cancelling CTX 1");
177 drop(ctx1);
178 sleep("main".into(), 5).await;
179 println!("Cancelling CTX 12");
180 drop(ctx12);
181 sleep("main".into(), 5).await;
182
183 println!("Cancelling CTX");
184 drop(ctx);
185
186 sleep("main".into(), 5).await;
187
188 }
189
190 async fn sleep(name:String, what: u64) {
191 for i in 0..what {
192 println!("Task {} sleeping {} out of {} seconds", name, i + 1, what);
193 tokio::time::sleep(Duration::from_secs(1)).await;
194 println!("Task {} awake", name);
195 }
196 }
197}