Skip to main content

reovim_kernel/sched/
work_queue.rs

1//! Work queue for deferred task execution.
2//!
3//! Linux equivalent: `kernel/workqueue.c`
4//!
5//! This module provides a bounded, thread-safe queue for scheduling
6//! tasks to be executed later by the runtime.
7//!
8//! # Features
9//!
10//! - **Bounded capacity**: Prevents unbounded memory growth
11//! - **Overflow detection**: Tracks dropped tasks without blocking
12//! - **Thread-safe**: Can be accessed from multiple threads
13//! - **FIFO ordering**: Tasks processed in submission order
14//!
15//! # Example
16//!
17//! ```
18//! use reovim_kernel::api::v1::*;
19//!
20//! let queue = WorkQueue::new();
21//!
22//! // Push some tasks
23//! queue.push(Task::new(|| println!("Task 1")));
24//! queue.push(Task::new(|| println!("Task 2")));
25//!
26//! // Pop and execute
27//! while let Some(mut task) = queue.try_pop() {
28//!     task.execute().ok();
29//! }
30//! ```
31
32use std::{
33    collections::VecDeque,
34    sync::atomic::{AtomicUsize, Ordering},
35};
36
37use crate::arch::sync::Mutex;
38
39use super::task::Task;
40
41/// Default capacity for work queue.
42pub const DEFAULT_CAPACITY: usize = 1024;
43
44/// Maximum capacity (to prevent unbounded growth).
45pub const MAX_CAPACITY: usize = 4096;
46
47/// Work queue for deferred tasks.
48///
49/// Thread-safe, FIFO queue with bounded capacity. When the queue is full,
50/// new tasks are dropped and the overflow counter is incremented.
51///
52/// # Thread Safety
53///
54/// The queue uses a `Mutex<VecDeque<Task>>` internally, making it safe
55/// to push from multiple threads. However, heavy contention should be
56/// avoided for performance.
57pub struct WorkQueue {
58    /// Queue storage.
59    queue: Mutex<VecDeque<Task>>,
60
61    /// Maximum capacity.
62    capacity: usize,
63
64    /// Number of tasks dropped due to overflow.
65    dropped: AtomicUsize,
66}
67
68impl WorkQueue {
69    /// Create a new work queue with default capacity.
70    #[must_use]
71    pub fn new() -> Self {
72        Self::with_capacity(DEFAULT_CAPACITY)
73    }
74
75    /// Create a work queue with specified capacity.
76    ///
77    /// Capacity is clamped to `MAX_CAPACITY`.
78    #[must_use]
79    pub fn with_capacity(capacity: usize) -> Self {
80        let capacity = capacity.min(MAX_CAPACITY);
81        Self {
82            queue: Mutex::new(VecDeque::with_capacity(capacity)),
83            capacity,
84            dropped: AtomicUsize::new(0),
85        }
86    }
87
88    /// Push a task to the queue.
89    ///
90    /// Returns `true` if the task was queued, `false` if dropped due to overflow.
91    /// This is non-blocking - overflow tasks are immediately dropped.
92    pub fn push(&self, task: Task) -> bool {
93        let mut queue = self.queue.lock();
94        if queue.len() >= self.capacity {
95            self.dropped.fetch_add(1, Ordering::Relaxed);
96            return false;
97        }
98        queue.push_back(task);
99        true
100    }
101
102    /// Try to pop a task from the queue.
103    ///
104    /// Returns `None` if the queue is empty.
105    #[must_use]
106    pub fn try_pop(&self) -> Option<Task> {
107        self.queue.lock().pop_front()
108    }
109
110    /// Drain all tasks from the queue.
111    ///
112    /// Returns all pending tasks, leaving the queue empty.
113    #[must_use]
114    pub fn drain(&self) -> Vec<Task> {
115        let mut queue = self.queue.lock();
116        queue.drain(..).collect()
117    }
118
119    /// Process up to `limit` pending tasks.
120    ///
121    /// Pops and executes tasks in order. Returns the number of tasks
122    /// successfully executed. Tasks that fail or panic are counted
123    /// but not returned.
124    pub fn process_pending(&self, limit: usize) -> usize {
125        let mut processed = 0;
126        while processed < limit {
127            let Some(mut task) = self.try_pop() else {
128                break;
129            };
130            // Execute, ignoring errors (caller can use executor for panic handling)
131            let _ = task.execute();
132            processed += 1;
133        }
134        processed
135    }
136
137    /// Get the number of pending tasks.
138    ///
139    /// Note: This is a snapshot and may change immediately after returning.
140    #[must_use]
141    pub fn len(&self) -> usize {
142        self.queue.lock().len()
143    }
144
145    /// Check if the queue is empty.
146    ///
147    /// Note: This is a snapshot and may change immediately after returning.
148    #[must_use]
149    pub fn is_empty(&self) -> bool {
150        self.queue.lock().is_empty()
151    }
152
153    /// Get the queue capacity.
154    #[inline]
155    #[must_use]
156    pub const fn capacity(&self) -> usize {
157        self.capacity
158    }
159
160    /// Get the number of dropped tasks (overflow).
161    ///
162    /// This count accumulates over the lifetime of the queue.
163    #[must_use]
164    pub fn dropped_count(&self) -> usize {
165        self.dropped.load(Ordering::Relaxed)
166    }
167
168    /// Clear all pending tasks without executing them.
169    ///
170    /// Returns the number of tasks cleared.
171    pub fn clear(&self) -> usize {
172        let mut queue = self.queue.lock();
173        let count = queue.len();
174        queue.clear();
175        count
176    }
177}
178
179impl Default for WorkQueue {
180    fn default() -> Self {
181        Self::new()
182    }
183}
184
185impl std::fmt::Debug for WorkQueue {
186    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187        f.debug_struct("WorkQueue")
188            .field("len", &self.len())
189            .field("capacity", &self.capacity)
190            .field("dropped", &self.dropped_count())
191            .finish_non_exhaustive()
192    }
193}