tokio_async_utils/
task_map.rs

1use std::{
2    collections::HashMap,
3    ops::{Deref, DerefMut},
4};
5
6use crate::TaskHandle;
7
8/// Map of tasks that can store TaskHandles based on any key type.
9#[derive(Debug)]
10pub struct TaskMap<K, V> {
11    inner: HashMap<K, TaskHandle<V>>,
12}
13
14impl<K, V> Deref for TaskMap<K, V> {
15    type Target = HashMap<K, TaskHandle<V>>;
16
17    fn deref(&self) -> &Self::Target {
18        &self.inner
19    }
20}
21impl<K, V> DerefMut for TaskMap<K, V> {
22    fn deref_mut(&mut self) -> &mut Self::Target {
23        &mut self.inner
24    }
25}
26
27impl<K, V> TaskMap<K, V> {
28    pub fn new() -> Self {
29        Self {
30            inner: HashMap::new(),
31        }
32    }
33}
34
35#[cfg(test)]
36mod tests {
37    use std::time::Duration;
38
39    use tokio::{sync::broadcast, time::sleep};
40
41    use crate::TaskExt;
42
43    use super::*;
44
45    #[tokio::test]
46    async fn is_dropped_correctly() {
47        let (tx, _) = broadcast::channel(20);
48        let mut map = TaskMap::new();
49        let mut rx_clone = tx.subscribe();
50        map.insert(
51            1,
52            tokio::spawn(async move { while let Ok(_) = rx_clone.recv().await {} })
53                .to_task_handle(),
54        );
55        let mut rx_clone = tx.subscribe();
56        map.insert(
57            2,
58            tokio::spawn(async move { while let Ok(_) = rx_clone.recv().await {} })
59                .to_task_handle(),
60        );
61
62        let r = tx.send(true);
63        assert!(r.is_ok());
64        drop(map);
65        // i guess we need to wait until tokio runtime drops inner task
66        sleep(Duration::from_millis(1)).await;
67        let r = tx.send(false);
68        assert!(r.is_err(), "expected error, but got ok");
69    }
70}