long_running_task/
lib.rs

1//! `long-running-task` is a simple library to handle and manage [long-running tasks](https://restfulapi.net/rest-api-design-for-long-running-tasks/).
2//! If you want to use this crate in combination with web-frameworks you probably want to enable the feature `serde`.
3
4#![warn(clippy::all, clippy::pedantic)]
5#![warn(missing_docs)]
6
7use std::{
8    collections::HashMap,
9    time::{Duration, Instant},
10};
11use uuid::Uuid;
12
13/// Structs implementing this trait hold the current progress of a task.
14pub trait Progressible {
15    /// Report progress on a task, for example by increasing a progress field by 1.
16    fn progress(&mut self);
17}
18
19#[derive(PartialEq, Eq, Debug)]
20#[cfg_attr(feature = "serde", derive(serde::Serialize))]
21/// Representation of a task's state.
22pub enum TaskState<V, P>
23where
24    P: Progressible,
25{
26    /// The task is not finished yet and holds its current progress.
27    Pending(P),
28    /// The task is done with a value of type V.
29    Done(V),
30}
31
32/// A pool to manage long-running tasks.
33pub struct TaskPool<V, P>
34where
35    P: Progressible,
36{
37    pending: HashMap<Uuid, P>,
38    completed: HashMap<Uuid, (Instant, V)>,
39    lifespan: Option<Duration>,
40}
41
42impl<V, P> Default for TaskPool<V, P>
43where
44    P: Progressible,
45{
46    fn default() -> Self {
47        Self {
48            pending: HashMap::new(),
49            completed: HashMap::new(),
50            lifespan: None,
51        }
52    }
53}
54
55/// A unique handle to a single task.
56/// Does not implement clone or expose its inner fields
57/// because it must be a unique reference to the task.
58pub struct Handle {
59    uuid: Uuid,
60}
61
62impl<V, P> TaskPool<V, P>
63where
64    P: Progressible + Clone,
65{
66    /// Configure the lifespan of tasks.
67    /// `None` means that tasks will never expire.
68    /// Expired tasks are purged as soon as `complete` is invoked.
69    /// Specifying a lifespan is useful because clients might not always retrieve completed tasks.
70    /// Without configuring a lifespan, such abandoned tasks accumulate over time and fill up memory.
71    #[must_use]
72    pub fn with_lifespan(mut self, lifespan: Option<Duration>) -> Self {
73        self.lifespan = lifespan;
74        self
75    }
76
77    /// Insert a task's initial progress to receive a `Handle` and `Uuid`
78    /// referencing the task.
79    #[must_use]
80    pub fn insert(&mut self, pending: P) -> (Handle, Uuid) {
81        let uuid = Uuid::new_v4();
82        self.pending.insert(uuid, pending);
83        (Handle { uuid }, uuid)
84    }
85
86    /// Get the task state and remove it from the pool if it is done.
87    pub fn retrieve(&mut self, uuid: &Uuid) -> Option<TaskState<V, P>> {
88        use TaskState::{Done, Pending};
89
90        if let Some(p) = self.pending.get(uuid) {
91            return Some(Pending(p.clone()));
92        }
93
94        self.completed.remove(uuid).map(|f| Done(f.1))
95    }
96
97    /// Report progress on a pending task.
98    /// Calls `Progressible::progress` on the corresponding progress state.
99    pub fn progress(&mut self, handle: &Handle) {
100        match self.pending.get_mut(&handle.uuid) {
101            Some(p) => p.progress(),
102            None => unreachable!("Pending task not found. This should never happen because a task's handle cannot outlive the task."),
103        }
104    }
105
106    /// Mark the task associated to the handle as completed with a value of type T.
107    /// The handle must be passed by value so that this is the final action.
108    /// As a side effect expired tasks are purged.
109    #[allow(clippy::needless_pass_by_value)]
110    pub fn complete(&mut self, handle: Handle, value: V) {
111        self.pending.remove(&handle.uuid);
112        self.purge_expired_tasks();
113        self.completed.insert(handle.uuid, (Instant::now(), value));
114    }
115
116    fn purge_expired_tasks(&mut self) {
117        if let Some(lifespan) = self.lifespan {
118            let now = Instant::now();
119            self.completed
120                .retain(|_, (inserted_at, _)| now.duration_since(*inserted_at) < lifespan);
121        }
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    #[derive(Clone, Debug, PartialEq, Eq)]
128    struct Progress {
129        pub progress: usize,
130        pub total: usize,
131    }
132
133    #[derive(Clone, Debug, PartialEq, Eq)]
134    struct EmptyProgress {}
135
136    impl Progressible for EmptyProgress {
137        fn progress(&mut self) {}
138    }
139
140    impl Progressible for Progress {
141        fn progress(&mut self) {
142            self.progress = (self.progress + 1).min(self.total);
143        }
144    }
145
146    use std::{thread, time::Duration};
147
148    use super::Progressible;
149    use crate::{TaskPool, TaskState::*};
150
151    #[test]
152    fn insert_and_get() {
153        let mut pool = TaskPool::<u8, Progress>::default();
154        let initial_value = Progress {
155            progress: 0,
156            total: 7,
157        };
158
159        let (handle, uuid) = pool.insert(initial_value);
160        assert_eq!(
161            pool.retrieve(&uuid),
162            Some(Pending(Progress {
163                progress: 0,
164                total: 7
165            }))
166        );
167
168        pool.progress(&handle);
169        assert_eq!(
170            pool.retrieve(&uuid),
171            Some(Pending(Progress {
172                progress: 1,
173                total: 7
174            }))
175        );
176
177        pool.complete(handle, 42);
178
179        assert_eq!(get_inner_size(&pool), 1);
180        assert_eq!(pool.retrieve(&uuid), Some(Done(42)));
181        assert_eq!(get_inner_size(&pool), 0);
182        assert_eq!(pool.retrieve(&uuid), None);
183    }
184
185    #[test]
186    fn exceed_lifespan() {
187        let lifespan = Duration::from_millis(10);
188        let mut pool = TaskPool::<(), EmptyProgress>::default().with_lifespan(Some(lifespan));
189
190        let id = insert_and_complete(&mut pool);
191        thread::sleep(lifespan); // exceed time
192        insert_and_complete(&mut pool); // trigger purge by completing new task
193
194        assert_eq!(pool.retrieve(&id), None);
195    }
196
197    #[test]
198    fn within_lifespan() {
199        let lifespan = Duration::from_millis(10);
200        let mut pool = TaskPool::<(), EmptyProgress>::default().with_lifespan(Some(lifespan));
201
202        let id = insert_and_complete(&mut pool);
203        insert_and_complete(&mut pool); // trigger purge by completing new task
204
205        assert_eq!(pool.retrieve(&id), Some(Done(())));
206    }
207
208    fn insert_and_complete(pool: &mut TaskPool<(), EmptyProgress>) -> uuid::Uuid {
209        let (handle, id) = pool.insert(EmptyProgress {});
210        pool.complete(handle, ());
211        id
212    }
213
214    fn get_inner_size<V, P>(pool: &TaskPool<V, P>) -> usize
215    where
216        P: Progressible,
217    {
218        pool.pending.len() + pool.completed.len()
219    }
220}