Skip to main content

altair_concurrent/
task_map.rs

1//! Builder for a set of named concurrent tasks.
2
3use futures::future::BoxFuture;
4use std::collections::BTreeMap;
5use tokio_util::sync::CancellationToken;
6
7type BoxedTaskFn<T> =
8    Box<dyn FnOnce(CancellationToken) -> BoxFuture<'static, Result<T, BoxedError>> + Send>;
9
10type BoxedError = Box<dyn std::error::Error + Send + Sync>;
11
12/// A set of named tasks to run concurrently.
13///
14/// `T` is the success result type — all tasks in a `TaskMap` produce the
15/// same `T`. For heterogeneous batches, use `tokio::join!` directly.
16pub struct TaskMap<T> {
17    pub(crate) tasks: BTreeMap<&'static str, BoxedTaskFn<T>>,
18}
19
20impl<T> TaskMap<T> {
21    /// Create an empty task map.
22    #[must_use]
23    pub fn new() -> Self {
24        Self {
25            tasks: BTreeMap::new(),
26        }
27    }
28
29    /// Insert a named task into the map.
30    ///
31    /// The closure receives the active [`CancellationToken`] and must return
32    /// a future producing `Result<T, E>` where `E` can be boxed into a
33    /// `std::error::Error`.
34    ///
35    /// # Duplicate names
36    ///
37    /// Backed by `BTreeMap::insert` — calling `insert` twice with the
38    /// same `name` **silently overwrites** the earlier task ("last write
39    /// wins"). If you need duplicate-name detection, check `.len()`
40    /// before/after the second insert or use distinct names.
41    #[must_use]
42    pub fn insert<F, Fut, E>(mut self, name: &'static str, task: F) -> Self
43    where
44        F: FnOnce(CancellationToken) -> Fut + Send + 'static,
45        Fut: std::future::Future<Output = std::result::Result<T, E>> + Send + 'static,
46        E: Into<BoxedError>,
47        T: Send + 'static,
48    {
49        let boxed: BoxedTaskFn<T> = Box::new(move |token| {
50            let fut = task(token);
51            Box::pin(async move { fut.await.map_err(Into::into) })
52        });
53        self.tasks.insert(name, boxed);
54        self
55    }
56
57    /// Return the number of tasks currently in the map.
58    #[must_use]
59    pub fn len(&self) -> usize {
60        self.tasks.len()
61    }
62
63    /// Return `true` if no tasks have been inserted.
64    #[must_use]
65    pub fn is_empty(&self) -> bool {
66        self.tasks.is_empty()
67    }
68}
69
70impl<T> Default for TaskMap<T> {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79    use pretty_assertions::assert_eq;
80
81    #[test]
82    fn new_is_empty() {
83        let m: TaskMap<u32> = TaskMap::new();
84        assert!(m.is_empty());
85        assert_eq!(m.len(), 0);
86    }
87
88    #[test]
89    fn insert_increments_len() {
90        let m: TaskMap<u32> = TaskMap::new()
91            .insert("a", |_| async { Ok::<_, std::io::Error>(1) })
92            .insert("b", |_| async { Ok::<_, std::io::Error>(2) });
93        assert_eq!(m.len(), 2);
94    }
95
96    #[test]
97    fn insert_duplicate_overwrites() {
98        let m: TaskMap<u32> = TaskMap::new()
99            .insert("a", |_| async { Ok::<_, std::io::Error>(1) })
100            .insert("a", |_| async { Ok::<_, std::io::Error>(2) });
101        assert_eq!(m.len(), 1);
102    }
103
104    #[test]
105    fn default_is_empty() {
106        let m: TaskMap<u32> = TaskMap::default();
107        assert!(m.is_empty());
108        assert_eq!(m.len(), 0);
109    }
110
111    #[test]
112    fn len_after_three_inserts() {
113        let m: TaskMap<u32> = TaskMap::new()
114            .insert("a", |_| async { Ok::<_, std::io::Error>(1) })
115            .insert("b", |_| async { Ok::<_, std::io::Error>(2) })
116            .insert("c", |_| async { Ok::<_, std::io::Error>(3) });
117        assert_eq!(m.len(), 3);
118        assert!(!m.is_empty());
119    }
120
121    #[tokio::test]
122    async fn task_closure_executes_after_insert() {
123        let m: TaskMap<u32> =
124            TaskMap::new().insert("only", |_| async { Ok::<_, std::io::Error>(99) });
125        assert_eq!(m.len(), 1);
126        // Pull the task out and run it manually to exercise the boxed closure path.
127        let (_name, task_fn) = m.tasks.into_iter().next().unwrap();
128        let ct = tokio_util::sync::CancellationToken::new();
129        let out = task_fn(ct).await.unwrap();
130        assert_eq!(out, 99);
131    }
132}