reovim_kernel/mm/saturator.rs
1//! Background task scheduler for lazy computation.
2//!
3//! The Saturator provides a mechanism for scheduling background computation
4//! with priority levels. It's designed for tasks like syntax highlighting
5//! where viewport content should be processed immediately (high priority)
6//! while off-screen content can be processed later (low priority).
7//!
8//! # Design Philosophy
9//!
10//! Following the kernel "mechanism, not policy" principle:
11//! - Generic over work item type and result type
12//! - No syntax or highlighting knowledge in the kernel
13//! - Priority is a simple high/low distinction
14//! - `EventScope` integration for lifecycle tracking
15//!
16//! # Thread Model
17//!
18//! The saturator spawns a dedicated background thread that processes work
19//! items from two priority queues. High priority items are always processed
20//! before low priority items.
21//!
22//! # Example
23//!
24//! ```ignore
25//! use reovim_kernel::api::v1::*;
26//!
27//! // Create a saturator for processing lines
28//! let saturator = spawn_saturator(
29//! |line_idx: usize| {
30//! // Process line and return result
31//! format!("Processed line {}", line_idx)
32//! },
33//! |result| {
34//! println!("Completed: {}", result);
35//! },
36//! );
37//!
38//! // Submit high-priority work (viewport)
39//! saturator.submit(0, None);
40//! saturator.submit(1, None);
41//!
42//! // Submit low-priority work (off-screen)
43//! saturator.submit_background(100, None);
44//! ```
45
46use std::{
47 sync::{
48 Arc,
49 atomic::{AtomicBool, Ordering},
50 },
51 thread::{self, JoinHandle},
52};
53
54use crate::ipc::{EventScope, Receiver, Sender, channel};
55
56/// Request priority for background computation.
57#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
58pub enum RequestPriority {
59 /// High priority - process immediately (viewport content).
60 #[default]
61 High,
62 /// Low priority - process when idle (off-screen content).
63 Low,
64}
65
66/// A request to the saturator with associated scope.
67#[derive(Debug)]
68pub struct SaturationRequest<T> {
69 /// The work data to process.
70 pub data: T,
71 /// Request priority.
72 pub priority: RequestPriority,
73 /// Optional scope for lifecycle tracking.
74 pub scope: Option<EventScope>,
75}
76
77/// Handle to send work to a saturator.
78///
79/// The handle is cheap to clone and can be shared across threads.
80/// When all handles are dropped, the saturator will finish processing
81/// remaining work and shut down.
82pub struct SaturatorHandle<T> {
83 /// Channel for high-priority work.
84 high_tx: Sender<WorkItem<T>>,
85 /// Channel for low-priority work.
86 low_tx: Sender<WorkItem<T>>,
87 /// Shutdown flag.
88 shutdown: Arc<AtomicBool>,
89 /// Worker thread handle (only the original handle has this).
90 worker: Option<JoinHandle<()>>,
91}
92
93/// Internal work item with scope tracking.
94struct WorkItem<T> {
95 data: T,
96 scope: Option<EventScope>,
97}
98
99impl<T: Send + 'static> SaturatorHandle<T> {
100 /// Submit high-priority work with optional scope tracking.
101 ///
102 /// Follows the `EventScope` pattern from `event_bus.rs:252-259`:
103 /// scope is incremented before submit and decremented after completion.
104 ///
105 /// # Arguments
106 ///
107 /// * `work` - The work data to process
108 /// * `scope` - Optional scope for lifecycle tracking
109 pub fn submit(&self, work: T, scope: Option<&EventScope>) {
110 if let Some(s) = scope {
111 s.increment();
112 }
113 let _ = self.high_tx.send(WorkItem {
114 data: work,
115 scope: scope.cloned(),
116 });
117 }
118
119 /// Submit low-priority background work.
120 ///
121 /// # Arguments
122 ///
123 /// * `work` - The work data to process
124 /// * `scope` - Optional scope for lifecycle tracking
125 pub fn submit_background(&self, work: T, scope: Option<&EventScope>) {
126 if let Some(s) = scope {
127 s.increment();
128 }
129 let _ = self.low_tx.send(WorkItem {
130 data: work,
131 scope: scope.cloned(),
132 });
133 }
134
135 /// Submit a request with explicit priority.
136 pub fn submit_request(&self, request: SaturationRequest<T>) {
137 let scope = request.scope.as_ref();
138 match request.priority {
139 RequestPriority::High => self.submit(request.data, scope),
140 RequestPriority::Low => self.submit_background(request.data, scope),
141 }
142 }
143
144 /// Request shutdown of the saturator.
145 ///
146 /// The worker will finish processing remaining items before stopping.
147 pub fn shutdown(&self) {
148 self.shutdown.store(true, Ordering::Release);
149 }
150
151 /// Check if the saturator is shutting down.
152 #[must_use]
153 pub fn is_shutting_down(&self) -> bool {
154 self.shutdown.load(Ordering::Acquire)
155 }
156}
157
158impl<T> Clone for SaturatorHandle<T> {
159 fn clone(&self) -> Self {
160 Self {
161 high_tx: self.high_tx.clone(),
162 low_tx: self.low_tx.clone(),
163 shutdown: Arc::clone(&self.shutdown),
164 worker: None, // Clones don't own the worker thread
165 }
166 }
167}
168
169impl<T> Drop for SaturatorHandle<T> {
170 fn drop(&mut self) {
171 // Only the original handle owns the worker thread
172 if let Some(worker) = self.worker.take() {
173 self.shutdown.store(true, Ordering::Release);
174 // Wait for worker to finish
175 let _ = worker.join();
176 }
177 }
178}
179
180/// Spawn a saturator with the given processor and completion callback.
181///
182/// Creates a background worker thread that processes work items and calls
183/// the completion callback with results.
184///
185/// # Arguments
186///
187/// * `processor` - Function to process work items
188/// * `on_complete` - Callback called with each result
189///
190/// # Type Parameters
191///
192/// * `T` - Work item type (must be Send + 'static)
193/// * `F` - Processor function type
194/// * `R` - Result type (must be Send + 'static)
195///
196/// # Returns
197///
198/// A handle for submitting work to the saturator.
199///
200/// # Example
201///
202/// ```ignore
203/// let handle = spawn_saturator(
204/// |x: i32| x * 2,
205/// |result| println!("Result: {}", result),
206/// );
207///
208/// handle.submit(21, None); // Will print "Result: 42"
209/// ```
210pub fn spawn_saturator<T, F, R, C>(processor: F, on_complete: C) -> SaturatorHandle<T>
211where
212 T: Send + 'static,
213 F: Fn(T) -> R + Send + Sync + 'static,
214 R: Send + 'static,
215 C: Fn(R) + Send + Sync + 'static,
216{
217 let (high_tx, high_rx) = channel();
218 let (low_tx, low_rx) = channel();
219 let shutdown = Arc::new(AtomicBool::new(false));
220 let shutdown_clone = Arc::clone(&shutdown);
221
222 let processor = Arc::new(processor);
223 let on_complete = Arc::new(on_complete);
224
225 let worker = thread::spawn(move || {
226 worker_loop(high_rx, low_rx, shutdown_clone, processor, on_complete);
227 });
228
229 SaturatorHandle {
230 high_tx,
231 low_tx,
232 shutdown,
233 worker: Some(worker),
234 }
235}
236
237/// Worker loop that processes items from both queues.
238#[allow(clippy::needless_pass_by_value)] // Receivers are intentionally moved into the thread
239#[cfg_attr(coverage_nightly, coverage(off))]
240fn worker_loop<T, F, R, C>(
241 high_rx: Receiver<WorkItem<T>>,
242 low_rx: Receiver<WorkItem<T>>,
243 shutdown: Arc<AtomicBool>,
244 processor: Arc<F>,
245 on_complete: Arc<C>,
246) where
247 T: Send + 'static,
248 F: Fn(T) -> R + Send + Sync + 'static,
249 R: Send + 'static,
250 C: Fn(R) + Send + Sync + 'static,
251{
252 loop {
253 // Check shutdown flag
254 if shutdown.load(Ordering::Acquire) {
255 // Drain remaining high-priority items before shutting down
256 while let Ok(item) = high_rx.try_recv() {
257 process_item(item, &processor, &on_complete);
258 }
259 break;
260 }
261
262 // Try high priority first (biased polling)
263 if let Ok(item) = high_rx.try_recv() {
264 process_item(item, &processor, &on_complete);
265 continue;
266 }
267
268 // Then try low priority
269 if let Ok(item) = low_rx.try_recv() {
270 process_item(item, &processor, &on_complete);
271 continue;
272 }
273
274 // No work available, yield to avoid busy-waiting
275 thread::yield_now();
276 }
277}
278
279/// Process a single work item.
280fn process_item<T, F, R, C>(item: WorkItem<T>, processor: &Arc<F>, on_complete: &Arc<C>)
281where
282 F: Fn(T) -> R,
283 C: Fn(R),
284{
285 let result = processor(item.data);
286 on_complete(result);
287
288 // Decrement scope after completion (follows event_bus.rs pattern)
289 if let Some(scope) = item.scope {
290 scope.decrement();
291 }
292}
293
294/// Configuration for creating a saturator.
295#[derive(Debug, Clone)]
296pub struct SaturatorConfig {
297 /// Whether to process remaining items on shutdown.
298 pub drain_on_shutdown: bool,
299}
300
301impl Default for SaturatorConfig {
302 fn default() -> Self {
303 Self {
304 drain_on_shutdown: true,
305 }
306 }
307}