Skip to main content

reovim_kernel/sched/
executor.rs

1//! Synchronous task executor.
2//!
3//! Linux equivalent: Core scheduler in `kernel/sched/core.c`
4//!
5//! This module provides an executor for running tasks synchronously with
6//! panic handling. Tasks that panic are caught and marked as failed rather
7//! than crashing the entire runtime.
8//!
9//! # Panic Safety
10//!
11//! The executor wraps task execution in `catch_unwind`, ensuring that a
12//! panicking task doesn't bring down the entire editor. Failed tasks are
13//! marked as such and counted in statistics.
14//!
15//! # Example
16//!
17//! ```
18//! use reovim_kernel::api::v1::*;
19//! use std::sync::Arc;
20//!
21//! let queue = Arc::new(WorkQueue::new());
22//! let mut executor = Executor::new(Arc::clone(&queue));
23//!
24//! // Schedule some tasks
25//! queue.push(Task::new(|| println!("Task 1")));
26//! queue.push(Task::new(|| println!("Task 2")));
27//!
28//! // Process tasks
29//! let processed = executor.tick();
30//! assert_eq!(processed, 2);
31//! ```
32
33use std::{
34    panic::{AssertUnwindSafe, catch_unwind},
35    sync::Arc,
36};
37
38use super::{task::Task, work_queue::WorkQueue};
39
40/// Default batch size for processing.
41pub(super) const DEFAULT_BATCH_SIZE: usize = 16;
42
43/// Task executor for synchronous task processing.
44///
45/// Provides a simple run-to-completion model where tasks are executed
46/// one at a time. Panics are caught to prevent runtime crashes.
47pub struct Executor {
48    /// Work queue for pending tasks.
49    work_queue: Arc<WorkQueue>,
50
51    /// Maximum tasks to process per tick.
52    batch_size: usize,
53
54    /// Total tasks successfully executed.
55    executed_count: u64,
56
57    /// Total tasks that failed (error or panic).
58    failed_count: u64,
59}
60
61impl Executor {
62    /// Create a new executor with a shared work queue.
63    #[must_use]
64    pub const fn new(work_queue: Arc<WorkQueue>) -> Self {
65        Self {
66            work_queue,
67            batch_size: DEFAULT_BATCH_SIZE,
68            executed_count: 0,
69            failed_count: 0,
70        }
71    }
72
73    /// Set the batch size for processing.
74    ///
75    /// The batch size limits how many tasks are processed per `tick()` call.
76    #[must_use]
77    pub fn with_batch_size(mut self, size: usize) -> Self {
78        self.batch_size = size.max(1);
79        self
80    }
81
82    /// Process one tick (up to `batch_size` tasks).
83    ///
84    /// Returns the total number of tasks processed (successful + failed).
85    pub fn tick(&mut self) -> usize {
86        let mut processed = 0;
87
88        while processed < self.batch_size {
89            let Some(mut task) = self.work_queue.try_pop() else {
90                break;
91            };
92
93            if Self::execute_task(&mut task) {
94                self.executed_count += 1;
95            } else {
96                self.failed_count += 1;
97            }
98            processed += 1;
99        }
100
101        processed
102    }
103
104    /// Execute a single task with panic handling.
105    ///
106    /// Returns `true` if the task completed successfully, `false` if it
107    /// failed or panicked.
108    #[cfg_attr(coverage_nightly, coverage(off))]
109    pub fn execute_task(task: &mut Task) -> bool {
110        // Wrap in catch_unwind to handle panics
111        let result = catch_unwind(AssertUnwindSafe(|| task.execute()));
112
113        match result {
114            Ok(Ok(())) => true,
115            Ok(Err(_)) => {
116                // Task returned an error
117                task.mark_failed();
118                false
119            }
120            Err(_panic) => {
121                // Task panicked
122                task.mark_failed();
123                false
124            }
125        }
126    }
127
128    /// Schedule a task for execution.
129    ///
130    /// Returns `false` if the queue is full.
131    #[must_use]
132    pub fn schedule(&self, task: Task) -> bool {
133        self.work_queue.push(task)
134    }
135
136    /// Schedule work using a closure.
137    ///
138    /// Convenience method that creates a task with normal priority.
139    pub fn spawn<F>(&self, work: F) -> bool
140    where
141        F: FnOnce() + Send + 'static,
142    {
143        self.schedule(Task::new(work))
144    }
145
146    /// Get the total number of successfully executed tasks.
147    #[inline]
148    #[must_use]
149    pub const fn executed_count(&self) -> u64 {
150        self.executed_count
151    }
152
153    /// Get the total number of failed tasks.
154    #[inline]
155    #[must_use]
156    pub const fn failed_count(&self) -> u64 {
157        self.failed_count
158    }
159
160    /// Check if there's pending work.
161    #[must_use]
162    pub fn has_pending(&self) -> bool {
163        !self.work_queue.is_empty()
164    }
165
166    /// Get the current batch size.
167    #[inline]
168    #[must_use]
169    pub const fn batch_size(&self) -> usize {
170        self.batch_size
171    }
172
173    /// Drain all pending tasks, executing each one.
174    ///
175    /// Returns the total number of tasks processed.
176    pub fn drain(&mut self) -> usize {
177        let mut total = 0;
178        while self.has_pending() {
179            total += self.tick();
180        }
181        total
182    }
183
184    /// Reset execution statistics.
185    pub const fn reset_stats(&mut self) {
186        self.executed_count = 0;
187        self.failed_count = 0;
188    }
189}
190
191impl std::fmt::Debug for Executor {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        f.debug_struct("Executor")
194            .field("batch_size", &self.batch_size)
195            .field("executed_count", &self.executed_count)
196            .field("failed_count", &self.failed_count)
197            .field("pending", &self.work_queue.len())
198            .finish()
199    }
200}