1#![warn(clippy::all, clippy::pedantic)]
5#![warn(missing_docs)]
6
7use std::{
8 collections::HashMap,
9 time::{Duration, Instant},
10};
11use uuid::Uuid;
12
13pub trait Progressible {
15 fn progress(&mut self);
17}
18
19#[derive(PartialEq, Eq, Debug)]
20#[cfg_attr(feature = "serde", derive(serde::Serialize))]
21pub enum TaskState<V, P>
23where
24 P: Progressible,
25{
26 Pending(P),
28 Done(V),
30}
31
32pub struct TaskPool<V, P>
34where
35 P: Progressible,
36{
37 pending: HashMap<Uuid, P>,
38 completed: HashMap<Uuid, (Instant, V)>,
39 lifespan: Option<Duration>,
40}
41
42impl<V, P> Default for TaskPool<V, P>
43where
44 P: Progressible,
45{
46 fn default() -> Self {
47 Self {
48 pending: HashMap::new(),
49 completed: HashMap::new(),
50 lifespan: None,
51 }
52 }
53}
54
55pub struct Handle {
59 uuid: Uuid,
60}
61
62impl<V, P> TaskPool<V, P>
63where
64 P: Progressible + Clone,
65{
66 #[must_use]
72 pub fn with_lifespan(mut self, lifespan: Option<Duration>) -> Self {
73 self.lifespan = lifespan;
74 self
75 }
76
77 #[must_use]
80 pub fn insert(&mut self, pending: P) -> (Handle, Uuid) {
81 let uuid = Uuid::new_v4();
82 self.pending.insert(uuid, pending);
83 (Handle { uuid }, uuid)
84 }
85
86 pub fn retrieve(&mut self, uuid: &Uuid) -> Option<TaskState<V, P>> {
88 use TaskState::{Done, Pending};
89
90 if let Some(p) = self.pending.get(uuid) {
91 return Some(Pending(p.clone()));
92 }
93
94 self.completed.remove(uuid).map(|f| Done(f.1))
95 }
96
97 pub fn progress(&mut self, handle: &Handle) {
100 match self.pending.get_mut(&handle.uuid) {
101 Some(p) => p.progress(),
102 None => unreachable!("Pending task not found. This should never happen because a task's handle cannot outlive the task."),
103 }
104 }
105
106 #[allow(clippy::needless_pass_by_value)]
110 pub fn complete(&mut self, handle: Handle, value: V) {
111 self.pending.remove(&handle.uuid);
112 self.purge_expired_tasks();
113 self.completed.insert(handle.uuid, (Instant::now(), value));
114 }
115
116 fn purge_expired_tasks(&mut self) {
117 if let Some(lifespan) = self.lifespan {
118 let now = Instant::now();
119 self.completed
120 .retain(|_, (inserted_at, _)| now.duration_since(*inserted_at) < lifespan);
121 }
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 #[derive(Clone, Debug, PartialEq, Eq)]
128 struct Progress {
129 pub progress: usize,
130 pub total: usize,
131 }
132
133 #[derive(Clone, Debug, PartialEq, Eq)]
134 struct EmptyProgress {}
135
136 impl Progressible for EmptyProgress {
137 fn progress(&mut self) {}
138 }
139
140 impl Progressible for Progress {
141 fn progress(&mut self) {
142 self.progress = (self.progress + 1).min(self.total);
143 }
144 }
145
146 use std::{thread, time::Duration};
147
148 use super::Progressible;
149 use crate::{TaskPool, TaskState::*};
150
151 #[test]
152 fn insert_and_get() {
153 let mut pool = TaskPool::<u8, Progress>::default();
154 let initial_value = Progress {
155 progress: 0,
156 total: 7,
157 };
158
159 let (handle, uuid) = pool.insert(initial_value);
160 assert_eq!(
161 pool.retrieve(&uuid),
162 Some(Pending(Progress {
163 progress: 0,
164 total: 7
165 }))
166 );
167
168 pool.progress(&handle);
169 assert_eq!(
170 pool.retrieve(&uuid),
171 Some(Pending(Progress {
172 progress: 1,
173 total: 7
174 }))
175 );
176
177 pool.complete(handle, 42);
178
179 assert_eq!(get_inner_size(&pool), 1);
180 assert_eq!(pool.retrieve(&uuid), Some(Done(42)));
181 assert_eq!(get_inner_size(&pool), 0);
182 assert_eq!(pool.retrieve(&uuid), None);
183 }
184
185 #[test]
186 fn exceed_lifespan() {
187 let lifespan = Duration::from_millis(10);
188 let mut pool = TaskPool::<(), EmptyProgress>::default().with_lifespan(Some(lifespan));
189
190 let id = insert_and_complete(&mut pool);
191 thread::sleep(lifespan); insert_and_complete(&mut pool); assert_eq!(pool.retrieve(&id), None);
195 }
196
197 #[test]
198 fn within_lifespan() {
199 let lifespan = Duration::from_millis(10);
200 let mut pool = TaskPool::<(), EmptyProgress>::default().with_lifespan(Some(lifespan));
201
202 let id = insert_and_complete(&mut pool);
203 insert_and_complete(&mut pool); assert_eq!(pool.retrieve(&id), Some(Done(())));
206 }
207
208 fn insert_and_complete(pool: &mut TaskPool<(), EmptyProgress>) -> uuid::Uuid {
209 let (handle, id) = pool.insert(EmptyProgress {});
210 pool.complete(handle, ());
211 id
212 }
213
214 fn get_inner_size<V, P>(pool: &TaskPool<V, P>) -> usize
215 where
216 P: Progressible,
217 {
218 pool.pending.len() + pool.completed.len()
219 }
220}