reovim_kernel/sched/runtime.rs
1//! Runtime event loop coordinator.
2//!
3//! Linux equivalent: Main scheduler in `kernel/sched/core.c`
4//!
5//! The [`Runtime`] is the central coordinator for event processing and task
6//! execution. It manages the event loop lifecycle, processes events from the
7//! priority queue, and executes deferred tasks.
8//!
9//! # Design Philosophy
10//!
11//! - **Pure coordination**: Runtime doesn't own business logic, just orchestrates
12//! - **Sync execution**: All processing happens synchronously (async is driver responsibility)
13//! - **Panic safety**: Tasks that panic are caught and don't crash the runtime
14//! - **Graceful shutdown**: Supports clean shutdown with work draining
15//!
16//! # Example
17//!
18//! ```
19//! use reovim_kernel::api::v1::*;
20//!
21//! let mut runtime = Runtime::new();
22//! runtime.boot();
23//!
24//! // Schedule some work
25//! runtime.schedule_work(|| println!("Deferred work!"));
26//!
27//! // Process pending work and check if idle
28//! runtime.tick();
29//! assert!(runtime.is_idle());
30//!
31//! // Request shutdown
32//! runtime.shutdown();
33//! assert_eq!(runtime.state(), RuntimeState::Stopping);
34//! ```
35
36use std::{sync::Arc, time::Duration};
37
38use {
39 super::{
40 executor::Executor,
41 priority::PriorityQueue,
42 state::RuntimeState,
43 task::{Priority, Task},
44 timer::{DEFAULT_MAX_TIMERS, TimerHandle, TimerId, TimerWheel},
45 work_queue::WorkQueue,
46 },
47 crate::ipc::{DynEvent, EventBus, EventScope, Receiver, Sender, channel},
48};
49
50/// Default work queue capacity.
51pub const DEFAULT_WORK_QUEUE_CAPACITY: usize = 1024;
52
53/// Default priority queue capacity.
54pub const DEFAULT_PRIORITY_QUEUE_CAPACITY: usize = 256;
55
56/// Default batch size for task execution.
57pub const DEFAULT_BATCH_SIZE: usize = 16;
58
59/// Runtime configuration.
60///
61/// Used with [`Runtime::with_config()`] to customize runtime behavior.
62///
63/// # Example
64///
65/// ```
66/// use reovim_kernel::api::v1::*;
67///
68/// let config = RuntimeConfig {
69/// work_queue_capacity: 2048,
70/// priority_queue_capacity: 512,
71/// batch_size: 32,
72/// max_timers: 512,
73/// };
74///
75/// let runtime = Runtime::with_config(config);
76/// ```
77#[derive(Debug, Clone, Copy)]
78pub struct RuntimeConfig {
79 /// Maximum tasks in the work queue.
80 pub work_queue_capacity: usize,
81
82 /// Maximum events in the priority queue.
83 pub priority_queue_capacity: usize,
84
85 /// Maximum tasks processed per tick.
86 pub batch_size: usize,
87
88 /// Maximum concurrent timers.
89 pub max_timers: usize,
90}
91
92impl Default for RuntimeConfig {
93 fn default() -> Self {
94 Self {
95 work_queue_capacity: DEFAULT_WORK_QUEUE_CAPACITY,
96 priority_queue_capacity: DEFAULT_PRIORITY_QUEUE_CAPACITY,
97 batch_size: DEFAULT_BATCH_SIZE,
98 max_timers: DEFAULT_MAX_TIMERS,
99 }
100 }
101}
102
103/// Commands that can be sent to the runtime.
104///
105/// Used for external control of the runtime lifecycle.
106#[derive(Debug)]
107pub enum RuntimeCommand {
108 /// Request graceful shutdown.
109 Shutdown,
110
111 /// Request emergency stop (panic recovery).
112 Emergency,
113
114 /// Schedule a task for execution.
115 ScheduleTask(Task),
116}
117
118/// Runtime execution statistics.
119///
120/// Provides insight into runtime operation for monitoring and debugging.
121#[derive(Debug, Clone, Copy, Default)]
122pub struct RuntimeStats {
123 /// Current runtime state.
124 pub state: RuntimeState,
125
126 /// Number of events in the priority queue.
127 pub event_queue_len: usize,
128
129 /// Number of tasks in the work queue.
130 pub work_queue_len: usize,
131
132 /// Number of tasks dropped due to overflow.
133 pub work_dropped: usize,
134
135 /// Total tasks successfully executed.
136 pub tasks_executed: u64,
137
138 /// Total tasks that failed (error or panic).
139 pub tasks_failed: u64,
140
141 /// Number of active (pending) timers.
142 pub active_timers: usize,
143
144 /// Number of timers dropped due to capacity limit.
145 pub timers_dropped: u64,
146}
147
148/// Runtime event loop coordinator.
149///
150/// The `Runtime` manages the main event loop, coordinating between:
151/// - Event processing (via [`PriorityQueue`] and [`EventBus`])
152/// - Task execution (via [`WorkQueue`] and [`Executor`])
153/// - Lifecycle management (boot, run, shutdown)
154///
155/// # Thread Safety
156///
157/// The runtime itself is `Send` but not `Sync` - it should be owned by a
158/// single thread that drives the event loop. However, the command channel
159/// and work queue can be accessed from other threads for scheduling.
160///
161/// # State Transitions
162///
163/// ```text
164/// Booting → Running → Stopping
165/// ↓ ↓ ↓
166/// └─────────┴──────────┴→ Emergency
167/// ```
168pub struct Runtime {
169 /// Current lifecycle state.
170 state: RuntimeState,
171
172 /// Event bus for pub/sub communication.
173 event_bus: Arc<EventBus>,
174
175 /// Priority queue for ordered event processing.
176 event_queue: PriorityQueue,
177
178 /// Work queue for deferred tasks.
179 work_queue: Arc<WorkQueue>,
180
181 /// Timer wheel for delayed/periodic work.
182 timer_wheel: Arc<TimerWheel>,
183
184 /// Task executor with panic handling.
185 executor: Executor,
186
187 /// Sender for runtime commands.
188 command_tx: Sender<RuntimeCommand>,
189
190 /// Receiver for runtime commands.
191 command_rx: Receiver<RuntimeCommand>,
192
193 /// Whether a render is pending.
194 render_pending: bool,
195
196 /// Current event scope for lifecycle tracking.
197 current_scope: Option<EventScope>,
198}
199
200impl Runtime {
201 /// Create a new runtime with default configuration.
202 #[must_use]
203 pub fn new() -> Self {
204 Self::with_config(RuntimeConfig::default())
205 }
206
207 /// Create a runtime with custom configuration.
208 #[must_use]
209 pub fn with_config(config: RuntimeConfig) -> Self {
210 let work_queue = Arc::new(WorkQueue::with_capacity(config.work_queue_capacity));
211 let executor = Executor::new(Arc::clone(&work_queue)).with_batch_size(config.batch_size);
212 let event_queue = PriorityQueue::with_capacity(config.priority_queue_capacity);
213 let timer_wheel = Arc::new(TimerWheel::with_max_timers(config.max_timers));
214 let (command_tx, command_rx) = channel();
215
216 Self {
217 state: RuntimeState::Booting,
218 event_bus: Arc::new(EventBus::new()),
219 event_queue,
220 work_queue,
221 timer_wheel,
222 executor,
223 command_tx,
224 command_rx,
225 render_pending: false,
226 current_scope: None,
227 }
228 }
229
230 /// Create a runtime with a shared event bus.
231 ///
232 /// This is useful when the event bus is shared with other subsystems.
233 #[must_use]
234 pub fn with_event_bus(mut self, event_bus: Arc<EventBus>) -> Self {
235 self.event_bus = event_bus;
236 self
237 }
238
239 /// Boot the runtime, transitioning from `Booting` to `Running`.
240 ///
241 /// # Panics
242 ///
243 /// Panics if called when not in `Booting` state.
244 pub fn boot(&mut self) {
245 assert_eq!(self.state, RuntimeState::Booting, "boot() called when not in Booting state");
246 self.state = RuntimeState::Running;
247 }
248
249 /// Get the current runtime state.
250 #[inline]
251 #[must_use]
252 pub const fn state(&self) -> RuntimeState {
253 self.state
254 }
255
256 /// Get a reference to the event bus.
257 #[inline]
258 #[must_use]
259 pub const fn event_bus(&self) -> &Arc<EventBus> {
260 &self.event_bus
261 }
262
263 /// Get a clone of the command sender.
264 ///
265 /// The sender can be used from other threads to send commands.
266 #[must_use]
267 pub fn command_sender(&self) -> Sender<RuntimeCommand> {
268 self.command_tx.clone()
269 }
270
271 /// Process one tick of the event loop.
272 ///
273 /// This method:
274 /// 1. Processes runtime commands
275 /// 2. Processes expired timers (schedules their callbacks as tasks)
276 /// 3. Dispatches events from the priority queue
277 /// 4. Executes tasks from the work queue
278 /// 5. Processes queued async events
279 ///
280 /// Returns `true` if the runtime should continue, `false` if it should stop.
281 #[cfg_attr(coverage_nightly, coverage(off))]
282 pub fn tick(&mut self) -> bool {
283 // Process commands first
284 self.process_commands();
285
286 // Check if we should stop
287 if self.state.is_terminal() {
288 // Drain remaining work before stopping (graceful shutdown)
289 if self.state == RuntimeState::Stopping {
290 self.executor.drain();
291 }
292 return false;
293 }
294
295 // Only process work when running
296 if self.state.is_running() {
297 // Process timers FIRST - may schedule new work
298 let timer_tasks = self.timer_wheel.tick(std::time::Instant::now());
299 for task in timer_tasks {
300 self.work_queue.push(task);
301 }
302
303 // Dispatch events from priority queue
304 self.dispatch_events();
305
306 // Execute tasks from work queue
307 self.executor.tick();
308
309 // Process queued async events
310 let _ = self.event_bus.process_queue();
311 }
312
313 true
314 }
315
316 /// Process pending runtime commands.
317 #[cfg_attr(coverage_nightly, coverage(off))]
318 fn process_commands(&mut self) {
319 while let Ok(command) = self.command_rx.try_recv() {
320 match command {
321 RuntimeCommand::Shutdown => {
322 if self.state == RuntimeState::Running {
323 self.state = RuntimeState::Stopping;
324 }
325 }
326 RuntimeCommand::Emergency => {
327 self.state = RuntimeState::Emergency;
328 }
329 RuntimeCommand::ScheduleTask(task) => {
330 if self.state.can_accept_work() {
331 self.work_queue.push(task);
332 }
333 }
334 }
335 }
336 }
337
338 /// Dispatch events from the priority queue to handlers.
339 #[cfg_attr(coverage_nightly, coverage(off))]
340 fn dispatch_events(&self) {
341 while let Some(event) = self.event_queue.pop() {
342 let _result = self.event_bus.dispatch(&event);
343 // Decrement scope if present
344 if let Some(ref scope) = self.current_scope
345 && scope.in_flight() > 0
346 {
347 scope.decrement();
348 }
349 }
350 }
351
352 /// Schedule a closure for deferred execution.
353 ///
354 /// Returns `false` if the queue is full or runtime is not accepting work.
355 pub fn schedule_work<F>(&self, work: F) -> bool
356 where
357 F: FnOnce() + Send + 'static,
358 {
359 if !self.state.can_accept_work() {
360 return false;
361 }
362 self.work_queue.push(Task::new(work))
363 }
364
365 /// Schedule a closure with specific priority.
366 ///
367 /// Returns `false` if the queue is full or runtime is not accepting work.
368 pub fn schedule_work_with_priority<F>(&self, priority: Priority, work: F) -> bool
369 where
370 F: FnOnce() + Send + 'static,
371 {
372 if !self.state.can_accept_work() {
373 return false;
374 }
375 self.work_queue.push(Task::with_priority(priority, work))
376 }
377
378 /// Schedule a pre-built task for execution.
379 ///
380 /// Returns `false` if the queue is full or runtime is not accepting work.
381 pub fn schedule_task(&self, task: Task) -> bool {
382 if !self.state.can_accept_work() {
383 return false;
384 }
385 self.work_queue.push(task)
386 }
387
388 /// Schedule work to execute after a delay (one-shot timer).
389 ///
390 /// The callback executes once after the delay passes, on the next tick
391 /// after the deadline. Returns a handle that cancels the timer when dropped.
392 ///
393 /// # Example
394 ///
395 /// ```
396 /// use reovim_kernel::api::v1::*;
397 /// use std::time::Duration;
398 ///
399 /// let mut runtime = Runtime::new();
400 /// runtime.boot();
401 ///
402 /// // Schedule work for 100ms from now
403 /// let handle = runtime.schedule_delayed(Duration::from_millis(100), || {
404 /// println!("Delayed work executed!");
405 /// });
406 ///
407 /// // Timer will fire on the tick after 100ms passes
408 /// // Drop handle to cancel, or let it fire
409 /// ```
410 pub fn schedule_delayed<F>(&self, delay: Duration, work: F) -> TimerHandle
411 where
412 F: FnOnce() + Send + 'static,
413 {
414 self.timer_wheel
415 .schedule_oneshot(delay, Priority::NORMAL, work)
416 }
417
418 /// Schedule work to execute periodically.
419 ///
420 /// The callback executes repeatedly at the specified interval. Returns a
421 /// handle that cancels the timer when dropped.
422 ///
423 /// # Example
424 ///
425 /// ```
426 /// use reovim_kernel::api::v1::*;
427 /// use std::time::Duration;
428 ///
429 /// let mut runtime = Runtime::new();
430 /// runtime.boot();
431 ///
432 /// // Schedule periodic work every 50ms
433 /// let handle = runtime.schedule_periodic(Duration::from_millis(50), || {
434 /// println!("Periodic work!");
435 /// });
436 ///
437 /// // Timer fires every 50ms until handle is dropped
438 /// ```
439 pub fn schedule_periodic<F>(&self, interval: Duration, work: F) -> TimerHandle
440 where
441 F: Fn() + Send + Sync + 'static,
442 {
443 self.timer_wheel
444 .schedule_periodic(interval, Priority::NORMAL, work)
445 }
446
447 /// Cancel a timer by ID.
448 ///
449 /// Returns `true` if the timer was found and cancelled, `false` if it
450 /// was already cancelled or has already fired.
451 ///
452 /// Note: Timers are automatically cancelled when their [`TimerHandle`] is
453 /// dropped, so explicit cancellation is rarely needed.
454 pub fn cancel_timer(&self, id: TimerId) -> bool {
455 self.timer_wheel.cancel(id)
456 }
457
458 /// Get a reference to the timer wheel.
459 ///
460 /// This allows external code to schedule timers directly with custom
461 /// configuration.
462 #[must_use]
463 pub const fn timer_wheel(&self) -> &Arc<TimerWheel> {
464 &self.timer_wheel
465 }
466
467 /// Queue an event for priority-ordered processing.
468 ///
469 /// Returns `false` if the queue is full.
470 pub fn queue_event(&self, event: DynEvent) -> bool {
471 self.event_queue.push(event)
472 }
473
474 /// Request a render on the next tick.
475 pub const fn request_render(&mut self) {
476 self.render_pending = true;
477 }
478
479 /// Check and clear the render pending flag.
480 ///
481 /// Returns `true` if a render was requested since the last call.
482 pub const fn take_render_pending(&mut self) -> bool {
483 let pending = self.render_pending;
484 self.render_pending = false;
485 pending
486 }
487
488 /// Check if a render is pending.
489 #[inline]
490 #[must_use]
491 pub const fn is_render_pending(&self) -> bool {
492 self.render_pending
493 }
494
495 /// Set the current event scope for lifecycle tracking.
496 pub fn set_scope(&mut self, scope: EventScope) {
497 self.current_scope = Some(scope);
498 }
499
500 /// Clear the current event scope.
501 pub fn clear_scope(&mut self) {
502 self.current_scope = None;
503 }
504
505 /// Get a reference to the current scope.
506 #[must_use]
507 pub const fn current_scope(&self) -> Option<&EventScope> {
508 self.current_scope.as_ref()
509 }
510
511 /// Request graceful shutdown.
512 ///
513 /// The runtime will finish processing current work before stopping.
514 pub fn shutdown(&mut self) {
515 if self.state == RuntimeState::Running {
516 self.state = RuntimeState::Stopping;
517 }
518 }
519
520 /// Request emergency stop.
521 ///
522 /// The runtime will stop immediately without draining work.
523 pub const fn emergency_stop(&mut self) {
524 self.state = RuntimeState::Emergency;
525 }
526
527 /// Check if the runtime is idle (no pending work).
528 #[must_use]
529 #[cfg_attr(coverage_nightly, coverage(off))]
530 pub fn is_idle(&self) -> bool {
531 self.event_queue.is_empty()
532 && self.work_queue.is_empty()
533 && self.event_bus.queue_is_empty()
534 && self.timer_wheel.pending_count() == 0
535 }
536
537 /// Get runtime statistics.
538 #[must_use]
539 pub fn stats(&self) -> RuntimeStats {
540 RuntimeStats {
541 state: self.state,
542 event_queue_len: self.event_queue.len(),
543 work_queue_len: self.work_queue.len(),
544 work_dropped: self.work_queue.dropped_count(),
545 tasks_executed: self.executor.executed_count(),
546 tasks_failed: self.executor.failed_count(),
547 active_timers: self.timer_wheel.pending_count(),
548 timers_dropped: self.timer_wheel.dropped_count(),
549 }
550 }
551
552 /// Get a reference to the work queue.
553 ///
554 /// This allows external code to schedule tasks directly.
555 #[must_use]
556 pub const fn work_queue(&self) -> &Arc<WorkQueue> {
557 &self.work_queue
558 }
559
560 /// Run until the runtime stops.
561 ///
562 /// This is a convenience method that calls `tick()` in a loop.
563 /// For more control, use `tick()` directly.
564 #[cfg_attr(coverage_nightly, coverage(off))]
565 pub fn run(&mut self) {
566 while self.tick() {}
567 }
568}
569
570impl Default for Runtime {
571 fn default() -> Self {
572 Self::new()
573 }
574}
575
576impl std::fmt::Debug for Runtime {
577 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
578 f.debug_struct("Runtime")
579 .field("state", &self.state)
580 .field("event_queue_len", &self.event_queue.len())
581 .field("work_queue_len", &self.work_queue.len())
582 .field("render_pending", &self.render_pending)
583 .field("has_scope", &self.current_scope.is_some())
584 .finish_non_exhaustive()
585 }
586}