reovim_kernel/sched/work_queue.rs
1//! Work queue for deferred task execution.
2//!
3//! Linux equivalent: `kernel/workqueue.c`
4//!
5//! This module provides a bounded, thread-safe queue for scheduling
6//! tasks to be executed later by the runtime.
7//!
8//! # Features
9//!
10//! - **Bounded capacity**: Prevents unbounded memory growth
11//! - **Overflow detection**: Tracks dropped tasks without blocking
12//! - **Thread-safe**: Can be accessed from multiple threads
13//! - **FIFO ordering**: Tasks processed in submission order
14//!
15//! # Example
16//!
17//! ```
18//! use reovim_kernel::api::v1::*;
19//!
20//! let queue = WorkQueue::new();
21//!
22//! // Push some tasks
23//! queue.push(Task::new(|| println!("Task 1")));
24//! queue.push(Task::new(|| println!("Task 2")));
25//!
26//! // Pop and execute
27//! while let Some(mut task) = queue.try_pop() {
28//! task.execute().ok();
29//! }
30//! ```
31
32use std::{
33 collections::VecDeque,
34 sync::atomic::{AtomicUsize, Ordering},
35};
36
37use crate::arch::sync::Mutex;
38
39use super::task::Task;
40
41/// Default capacity for work queue.
42pub const DEFAULT_CAPACITY: usize = 1024;
43
44/// Maximum capacity (to prevent unbounded growth).
45pub const MAX_CAPACITY: usize = 4096;
46
47/// Work queue for deferred tasks.
48///
49/// Thread-safe, FIFO queue with bounded capacity. When the queue is full,
50/// new tasks are dropped and the overflow counter is incremented.
51///
52/// # Thread Safety
53///
54/// The queue uses a `Mutex<VecDeque<Task>>` internally, making it safe
55/// to push from multiple threads. However, heavy contention should be
56/// avoided for performance.
57pub struct WorkQueue {
58 /// Queue storage.
59 queue: Mutex<VecDeque<Task>>,
60
61 /// Maximum capacity.
62 capacity: usize,
63
64 /// Number of tasks dropped due to overflow.
65 dropped: AtomicUsize,
66}
67
68impl WorkQueue {
69 /// Create a new work queue with default capacity.
70 #[must_use]
71 pub fn new() -> Self {
72 Self::with_capacity(DEFAULT_CAPACITY)
73 }
74
75 /// Create a work queue with specified capacity.
76 ///
77 /// Capacity is clamped to `MAX_CAPACITY`.
78 #[must_use]
79 pub fn with_capacity(capacity: usize) -> Self {
80 let capacity = capacity.min(MAX_CAPACITY);
81 Self {
82 queue: Mutex::new(VecDeque::with_capacity(capacity)),
83 capacity,
84 dropped: AtomicUsize::new(0),
85 }
86 }
87
88 /// Push a task to the queue.
89 ///
90 /// Returns `true` if the task was queued, `false` if dropped due to overflow.
91 /// This is non-blocking - overflow tasks are immediately dropped.
92 pub fn push(&self, task: Task) -> bool {
93 let mut queue = self.queue.lock();
94 if queue.len() >= self.capacity {
95 self.dropped.fetch_add(1, Ordering::Relaxed);
96 return false;
97 }
98 queue.push_back(task);
99 true
100 }
101
102 /// Try to pop a task from the queue.
103 ///
104 /// Returns `None` if the queue is empty.
105 #[must_use]
106 pub fn try_pop(&self) -> Option<Task> {
107 self.queue.lock().pop_front()
108 }
109
110 /// Drain all tasks from the queue.
111 ///
112 /// Returns all pending tasks, leaving the queue empty.
113 #[must_use]
114 pub fn drain(&self) -> Vec<Task> {
115 let mut queue = self.queue.lock();
116 queue.drain(..).collect()
117 }
118
119 /// Process up to `limit` pending tasks.
120 ///
121 /// Pops and executes tasks in order. Returns the number of tasks
122 /// successfully executed. Tasks that fail or panic are counted
123 /// but not returned.
124 pub fn process_pending(&self, limit: usize) -> usize {
125 let mut processed = 0;
126 while processed < limit {
127 let Some(mut task) = self.try_pop() else {
128 break;
129 };
130 // Execute, ignoring errors (caller can use executor for panic handling)
131 let _ = task.execute();
132 processed += 1;
133 }
134 processed
135 }
136
137 /// Get the number of pending tasks.
138 ///
139 /// Note: This is a snapshot and may change immediately after returning.
140 #[must_use]
141 pub fn len(&self) -> usize {
142 self.queue.lock().len()
143 }
144
145 /// Check if the queue is empty.
146 ///
147 /// Note: This is a snapshot and may change immediately after returning.
148 #[must_use]
149 pub fn is_empty(&self) -> bool {
150 self.queue.lock().is_empty()
151 }
152
153 /// Get the queue capacity.
154 #[inline]
155 #[must_use]
156 pub const fn capacity(&self) -> usize {
157 self.capacity
158 }
159
160 /// Get the number of dropped tasks (overflow).
161 ///
162 /// This count accumulates over the lifetime of the queue.
163 #[must_use]
164 pub fn dropped_count(&self) -> usize {
165 self.dropped.load(Ordering::Relaxed)
166 }
167
168 /// Clear all pending tasks without executing them.
169 ///
170 /// Returns the number of tasks cleared.
171 pub fn clear(&self) -> usize {
172 let mut queue = self.queue.lock();
173 let count = queue.len();
174 queue.clear();
175 count
176 }
177}
178
179impl Default for WorkQueue {
180 fn default() -> Self {
181 Self::new()
182 }
183}
184
185impl std::fmt::Debug for WorkQueue {
186 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187 f.debug_struct("WorkQueue")
188 .field("len", &self.len())
189 .field("capacity", &self.capacity)
190 .field("dropped", &self.dropped_count())
191 .finish_non_exhaustive()
192 }
193}