Skip to main content

oxihuman_core/
async_queue.rs

1// Copyright (C) 2026 COOLJAPAN OU (Team KitaSan)
2// SPDX-License-Identifier: Apache-2.0
3#![allow(dead_code)]
4
5/// A simple async-style task queue that stores pending and completed tasks.
6#[allow(dead_code)]
7#[derive(Debug, Clone)]
8pub struct AsyncQueue {
9    pending: Vec<AsyncTask>,
10    completed: Vec<AsyncTask>,
11    max_concurrent: usize,
12}
13
14#[allow(dead_code)]
15#[derive(Debug, Clone, PartialEq)]
16pub enum TaskState {
17    Pending,
18    Running,
19    Completed,
20    Failed,
21}
22
23#[allow(dead_code)]
24#[derive(Debug, Clone)]
25pub struct AsyncTask {
26    pub id: u64,
27    pub label: String,
28    pub state: TaskState,
29    pub progress: f32,
30}
31
32#[allow(dead_code)]
33impl AsyncQueue {
34    pub fn new(max_concurrent: usize) -> Self {
35        Self {
36            pending: Vec::new(),
37            completed: Vec::new(),
38            max_concurrent: max_concurrent.max(1),
39        }
40    }
41
42    pub fn enqueue(&mut self, id: u64, label: &str) -> bool {
43        if self.pending.iter().any(|t| t.id == id) || self.completed.iter().any(|t| t.id == id) {
44            return false;
45        }
46        self.pending.push(AsyncTask {
47            id,
48            label: label.to_string(),
49            state: TaskState::Pending,
50            progress: 0.0,
51        });
52        true
53    }
54
55    pub fn pending_count(&self) -> usize {
56        self.pending.len()
57    }
58
59    pub fn completed_count(&self) -> usize {
60        self.completed.len()
61    }
62
63    pub fn total_count(&self) -> usize {
64        self.pending.len() + self.completed.len()
65    }
66
67    pub fn is_empty(&self) -> bool {
68        self.pending.is_empty() && self.completed.is_empty()
69    }
70
71    pub fn running_count(&self) -> usize {
72        self.pending
73            .iter()
74            .filter(|t| t.state == TaskState::Running)
75            .count()
76    }
77
78    pub fn can_start_more(&self) -> bool {
79        self.running_count() < self.max_concurrent
80    }
81
82    pub fn start_next(&mut self) -> Option<u64> {
83        if !self.can_start_more() {
84            return None;
85        }
86        if let Some(task) = self
87            .pending
88            .iter_mut()
89            .find(|t| t.state == TaskState::Pending)
90        {
91            task.state = TaskState::Running;
92            Some(task.id)
93        } else {
94            None
95        }
96    }
97
98    pub fn complete_task(&mut self, id: u64) -> bool {
99        if let Some(pos) = self.pending.iter().position(|t| t.id == id) {
100            let mut task = self.pending.remove(pos);
101            task.state = TaskState::Completed;
102            task.progress = 1.0;
103            self.completed.push(task);
104            true
105        } else {
106            false
107        }
108    }
109
110    pub fn fail_task(&mut self, id: u64) -> bool {
111        if let Some(pos) = self.pending.iter().position(|t| t.id == id) {
112            let mut task = self.pending.remove(pos);
113            task.state = TaskState::Failed;
114            self.completed.push(task);
115            true
116        } else {
117            false
118        }
119    }
120
121    pub fn set_progress(&mut self, id: u64, progress: f32) -> bool {
122        if let Some(task) = self.pending.iter_mut().find(|t| t.id == id) {
123            task.progress = progress.clamp(0.0, 1.0);
124            true
125        } else {
126            false
127        }
128    }
129
130    pub fn clear_completed(&mut self) {
131        self.completed.clear();
132    }
133
134    pub fn get_task(&self, id: u64) -> Option<&AsyncTask> {
135        self.pending
136            .iter()
137            .chain(self.completed.iter())
138            .find(|t| t.id == id)
139    }
140
141    pub fn failed_count(&self) -> usize {
142        self.completed
143            .iter()
144            .filter(|t| t.state == TaskState::Failed)
145            .count()
146    }
147
148    pub fn max_concurrent(&self) -> usize {
149        self.max_concurrent
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[test]
158    fn test_new_empty() {
159        let q = AsyncQueue::new(4);
160        assert!(q.is_empty());
161        assert_eq!(q.max_concurrent(), 4);
162    }
163
164    #[test]
165    fn test_enqueue() {
166        let mut q = AsyncQueue::new(2);
167        assert!(q.enqueue(1, "task1"));
168        assert_eq!(q.pending_count(), 1);
169        assert!(!q.enqueue(1, "dup"));
170    }
171
172    #[test]
173    fn test_start_next() {
174        let mut q = AsyncQueue::new(1);
175        q.enqueue(1, "a");
176        q.enqueue(2, "b");
177        assert_eq!(q.start_next(), Some(1));
178        assert!(q.start_next().is_none());
179    }
180
181    #[test]
182    fn test_complete() {
183        let mut q = AsyncQueue::new(2);
184        q.enqueue(1, "a");
185        q.start_next();
186        assert!(q.complete_task(1));
187        assert_eq!(q.completed_count(), 1);
188        assert_eq!(q.pending_count(), 0);
189    }
190
191    #[test]
192    fn test_fail() {
193        let mut q = AsyncQueue::new(2);
194        q.enqueue(1, "a");
195        q.start_next();
196        assert!(q.fail_task(1));
197        assert_eq!(q.failed_count(), 1);
198    }
199
200    #[test]
201    fn test_progress() {
202        let mut q = AsyncQueue::new(2);
203        q.enqueue(1, "a");
204        q.start_next();
205        assert!(q.set_progress(1, 0.5));
206        let t = q.get_task(1).expect("should succeed");
207        assert!((t.progress - 0.5).abs() < f32::EPSILON);
208    }
209
210    #[test]
211    fn test_clear_completed() {
212        let mut q = AsyncQueue::new(2);
213        q.enqueue(1, "a");
214        q.start_next();
215        q.complete_task(1);
216        q.clear_completed();
217        assert_eq!(q.completed_count(), 0);
218    }
219
220    #[test]
221    fn test_total_count() {
222        let mut q = AsyncQueue::new(2);
223        q.enqueue(1, "a");
224        q.enqueue(2, "b");
225        q.start_next();
226        q.complete_task(1);
227        assert_eq!(q.total_count(), 2);
228    }
229
230    #[test]
231    fn test_running_count() {
232        let mut q = AsyncQueue::new(3);
233        q.enqueue(1, "a");
234        q.enqueue(2, "b");
235        q.start_next();
236        q.start_next();
237        assert_eq!(q.running_count(), 2);
238    }
239
240    #[test]
241    fn test_can_start_more() {
242        let mut q = AsyncQueue::new(1);
243        assert!(q.can_start_more());
244        q.enqueue(1, "a");
245        q.start_next();
246        assert!(!q.can_start_more());
247    }
248}