oxihuman_core/
async_queue.rs1#![allow(dead_code)]
4
5#[allow(dead_code)]
7#[derive(Debug, Clone)]
8pub struct AsyncQueue {
9 pending: Vec<AsyncTask>,
10 completed: Vec<AsyncTask>,
11 max_concurrent: usize,
12}
13
14#[allow(dead_code)]
15#[derive(Debug, Clone, PartialEq)]
16pub enum TaskState {
17 Pending,
18 Running,
19 Completed,
20 Failed,
21}
22
23#[allow(dead_code)]
24#[derive(Debug, Clone)]
25pub struct AsyncTask {
26 pub id: u64,
27 pub label: String,
28 pub state: TaskState,
29 pub progress: f32,
30}
31
32#[allow(dead_code)]
33impl AsyncQueue {
34 pub fn new(max_concurrent: usize) -> Self {
35 Self {
36 pending: Vec::new(),
37 completed: Vec::new(),
38 max_concurrent: max_concurrent.max(1),
39 }
40 }
41
42 pub fn enqueue(&mut self, id: u64, label: &str) -> bool {
43 if self.pending.iter().any(|t| t.id == id) || self.completed.iter().any(|t| t.id == id) {
44 return false;
45 }
46 self.pending.push(AsyncTask {
47 id,
48 label: label.to_string(),
49 state: TaskState::Pending,
50 progress: 0.0,
51 });
52 true
53 }
54
55 pub fn pending_count(&self) -> usize {
56 self.pending.len()
57 }
58
59 pub fn completed_count(&self) -> usize {
60 self.completed.len()
61 }
62
63 pub fn total_count(&self) -> usize {
64 self.pending.len() + self.completed.len()
65 }
66
67 pub fn is_empty(&self) -> bool {
68 self.pending.is_empty() && self.completed.is_empty()
69 }
70
71 pub fn running_count(&self) -> usize {
72 self.pending
73 .iter()
74 .filter(|t| t.state == TaskState::Running)
75 .count()
76 }
77
78 pub fn can_start_more(&self) -> bool {
79 self.running_count() < self.max_concurrent
80 }
81
82 pub fn start_next(&mut self) -> Option<u64> {
83 if !self.can_start_more() {
84 return None;
85 }
86 if let Some(task) = self
87 .pending
88 .iter_mut()
89 .find(|t| t.state == TaskState::Pending)
90 {
91 task.state = TaskState::Running;
92 Some(task.id)
93 } else {
94 None
95 }
96 }
97
98 pub fn complete_task(&mut self, id: u64) -> bool {
99 if let Some(pos) = self.pending.iter().position(|t| t.id == id) {
100 let mut task = self.pending.remove(pos);
101 task.state = TaskState::Completed;
102 task.progress = 1.0;
103 self.completed.push(task);
104 true
105 } else {
106 false
107 }
108 }
109
110 pub fn fail_task(&mut self, id: u64) -> bool {
111 if let Some(pos) = self.pending.iter().position(|t| t.id == id) {
112 let mut task = self.pending.remove(pos);
113 task.state = TaskState::Failed;
114 self.completed.push(task);
115 true
116 } else {
117 false
118 }
119 }
120
121 pub fn set_progress(&mut self, id: u64, progress: f32) -> bool {
122 if let Some(task) = self.pending.iter_mut().find(|t| t.id == id) {
123 task.progress = progress.clamp(0.0, 1.0);
124 true
125 } else {
126 false
127 }
128 }
129
130 pub fn clear_completed(&mut self) {
131 self.completed.clear();
132 }
133
134 pub fn get_task(&self, id: u64) -> Option<&AsyncTask> {
135 self.pending
136 .iter()
137 .chain(self.completed.iter())
138 .find(|t| t.id == id)
139 }
140
141 pub fn failed_count(&self) -> usize {
142 self.completed
143 .iter()
144 .filter(|t| t.state == TaskState::Failed)
145 .count()
146 }
147
148 pub fn max_concurrent(&self) -> usize {
149 self.max_concurrent
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[test]
158 fn test_new_empty() {
159 let q = AsyncQueue::new(4);
160 assert!(q.is_empty());
161 assert_eq!(q.max_concurrent(), 4);
162 }
163
164 #[test]
165 fn test_enqueue() {
166 let mut q = AsyncQueue::new(2);
167 assert!(q.enqueue(1, "task1"));
168 assert_eq!(q.pending_count(), 1);
169 assert!(!q.enqueue(1, "dup"));
170 }
171
172 #[test]
173 fn test_start_next() {
174 let mut q = AsyncQueue::new(1);
175 q.enqueue(1, "a");
176 q.enqueue(2, "b");
177 assert_eq!(q.start_next(), Some(1));
178 assert!(q.start_next().is_none());
179 }
180
181 #[test]
182 fn test_complete() {
183 let mut q = AsyncQueue::new(2);
184 q.enqueue(1, "a");
185 q.start_next();
186 assert!(q.complete_task(1));
187 assert_eq!(q.completed_count(), 1);
188 assert_eq!(q.pending_count(), 0);
189 }
190
191 #[test]
192 fn test_fail() {
193 let mut q = AsyncQueue::new(2);
194 q.enqueue(1, "a");
195 q.start_next();
196 assert!(q.fail_task(1));
197 assert_eq!(q.failed_count(), 1);
198 }
199
200 #[test]
201 fn test_progress() {
202 let mut q = AsyncQueue::new(2);
203 q.enqueue(1, "a");
204 q.start_next();
205 assert!(q.set_progress(1, 0.5));
206 let t = q.get_task(1).expect("should succeed");
207 assert!((t.progress - 0.5).abs() < f32::EPSILON);
208 }
209
210 #[test]
211 fn test_clear_completed() {
212 let mut q = AsyncQueue::new(2);
213 q.enqueue(1, "a");
214 q.start_next();
215 q.complete_task(1);
216 q.clear_completed();
217 assert_eq!(q.completed_count(), 0);
218 }
219
220 #[test]
221 fn test_total_count() {
222 let mut q = AsyncQueue::new(2);
223 q.enqueue(1, "a");
224 q.enqueue(2, "b");
225 q.start_next();
226 q.complete_task(1);
227 assert_eq!(q.total_count(), 2);
228 }
229
230 #[test]
231 fn test_running_count() {
232 let mut q = AsyncQueue::new(3);
233 q.enqueue(1, "a");
234 q.enqueue(2, "b");
235 q.start_next();
236 q.start_next();
237 assert_eq!(q.running_count(), 2);
238 }
239
240 #[test]
241 fn test_can_start_more() {
242 let mut q = AsyncQueue::new(1);
243 assert!(q.can_start_more());
244 q.enqueue(1, "a");
245 q.start_next();
246 assert!(!q.can_start_more());
247 }
248}