Skip to main content

reovim_kernel/sched/
runtime.rs

1//! Runtime event loop coordinator.
2//!
3//! Linux equivalent: Main scheduler in `kernel/sched/core.c`
4//!
5//! The [`Runtime`] is the central coordinator for event processing and task
6//! execution. It manages the event loop lifecycle, processes events from the
7//! priority queue, and executes deferred tasks.
8//!
9//! # Design Philosophy
10//!
11//! - **Pure coordination**: Runtime doesn't own business logic, just orchestrates
12//! - **Sync execution**: All processing happens synchronously (async is driver responsibility)
13//! - **Panic safety**: Tasks that panic are caught and don't crash the runtime
14//! - **Graceful shutdown**: Supports clean shutdown with work draining
15//!
16//! # Example
17//!
18//! ```
19//! use reovim_kernel::api::v1::*;
20//!
21//! let mut runtime = Runtime::new();
22//! runtime.boot();
23//!
24//! // Schedule some work
25//! runtime.schedule_work(|| println!("Deferred work!"));
26//!
27//! // Process pending work and check if idle
28//! runtime.tick();
29//! assert!(runtime.is_idle());
30//!
31//! // Request shutdown
32//! runtime.shutdown();
33//! assert_eq!(runtime.state(), RuntimeState::Stopping);
34//! ```
35
36use std::{sync::Arc, time::Duration};
37
38use {
39    super::{
40        executor::Executor,
41        priority::PriorityQueue,
42        state::RuntimeState,
43        task::{Priority, Task},
44        timer::{DEFAULT_MAX_TIMERS, TimerHandle, TimerId, TimerWheel},
45        work_queue::WorkQueue,
46    },
47    crate::ipc::{DynEvent, EventBus, EventScope, Receiver, Sender, channel},
48};
49
50/// Default work queue capacity.
51pub const DEFAULT_WORK_QUEUE_CAPACITY: usize = 1024;
52
53/// Default priority queue capacity.
54pub const DEFAULT_PRIORITY_QUEUE_CAPACITY: usize = 256;
55
56/// Default batch size for task execution.
57pub const DEFAULT_BATCH_SIZE: usize = 16;
58
59/// Runtime configuration.
60///
61/// Used with [`Runtime::with_config()`] to customize runtime behavior.
62///
63/// # Example
64///
65/// ```
66/// use reovim_kernel::api::v1::*;
67///
68/// let config = RuntimeConfig {
69///     work_queue_capacity: 2048,
70///     priority_queue_capacity: 512,
71///     batch_size: 32,
72///     max_timers: 512,
73/// };
74///
75/// let runtime = Runtime::with_config(config);
76/// ```
77#[derive(Debug, Clone, Copy)]
78pub struct RuntimeConfig {
79    /// Maximum tasks in the work queue.
80    pub work_queue_capacity: usize,
81
82    /// Maximum events in the priority queue.
83    pub priority_queue_capacity: usize,
84
85    /// Maximum tasks processed per tick.
86    pub batch_size: usize,
87
88    /// Maximum concurrent timers.
89    pub max_timers: usize,
90}
91
92impl Default for RuntimeConfig {
93    fn default() -> Self {
94        Self {
95            work_queue_capacity: DEFAULT_WORK_QUEUE_CAPACITY,
96            priority_queue_capacity: DEFAULT_PRIORITY_QUEUE_CAPACITY,
97            batch_size: DEFAULT_BATCH_SIZE,
98            max_timers: DEFAULT_MAX_TIMERS,
99        }
100    }
101}
102
103/// Commands that can be sent to the runtime.
104///
105/// Used for external control of the runtime lifecycle.
106#[derive(Debug)]
107pub enum RuntimeCommand {
108    /// Request graceful shutdown.
109    Shutdown,
110
111    /// Request emergency stop (panic recovery).
112    Emergency,
113
114    /// Schedule a task for execution.
115    ScheduleTask(Task),
116}
117
118/// Runtime execution statistics.
119///
120/// Provides insight into runtime operation for monitoring and debugging.
121#[derive(Debug, Clone, Copy, Default)]
122pub struct RuntimeStats {
123    /// Current runtime state.
124    pub state: RuntimeState,
125
126    /// Number of events in the priority queue.
127    pub event_queue_len: usize,
128
129    /// Number of tasks in the work queue.
130    pub work_queue_len: usize,
131
132    /// Number of tasks dropped due to overflow.
133    pub work_dropped: usize,
134
135    /// Total tasks successfully executed.
136    pub tasks_executed: u64,
137
138    /// Total tasks that failed (error or panic).
139    pub tasks_failed: u64,
140
141    /// Number of active (pending) timers.
142    pub active_timers: usize,
143
144    /// Number of timers dropped due to capacity limit.
145    pub timers_dropped: u64,
146}
147
148/// Runtime event loop coordinator.
149///
150/// The `Runtime` manages the main event loop, coordinating between:
151/// - Event processing (via [`PriorityQueue`] and [`EventBus`])
152/// - Task execution (via [`WorkQueue`] and [`Executor`])
153/// - Lifecycle management (boot, run, shutdown)
154///
155/// # Thread Safety
156///
157/// The runtime itself is `Send` but not `Sync` - it should be owned by a
158/// single thread that drives the event loop. However, the command channel
159/// and work queue can be accessed from other threads for scheduling.
160///
161/// # State Transitions
162///
163/// ```text
164/// Booting → Running → Stopping
165///    ↓         ↓          ↓
166///    └─────────┴──────────┴→ Emergency
167/// ```
168pub struct Runtime {
169    /// Current lifecycle state.
170    state: RuntimeState,
171
172    /// Event bus for pub/sub communication.
173    event_bus: Arc<EventBus>,
174
175    /// Priority queue for ordered event processing.
176    event_queue: PriorityQueue,
177
178    /// Work queue for deferred tasks.
179    work_queue: Arc<WorkQueue>,
180
181    /// Timer wheel for delayed/periodic work.
182    timer_wheel: Arc<TimerWheel>,
183
184    /// Task executor with panic handling.
185    executor: Executor,
186
187    /// Sender for runtime commands.
188    command_tx: Sender<RuntimeCommand>,
189
190    /// Receiver for runtime commands.
191    command_rx: Receiver<RuntimeCommand>,
192
193    /// Whether a render is pending.
194    render_pending: bool,
195
196    /// Current event scope for lifecycle tracking.
197    current_scope: Option<EventScope>,
198}
199
200impl Runtime {
201    /// Create a new runtime with default configuration.
202    #[must_use]
203    pub fn new() -> Self {
204        Self::with_config(RuntimeConfig::default())
205    }
206
207    /// Create a runtime with custom configuration.
208    #[must_use]
209    pub fn with_config(config: RuntimeConfig) -> Self {
210        let work_queue = Arc::new(WorkQueue::with_capacity(config.work_queue_capacity));
211        let executor = Executor::new(Arc::clone(&work_queue)).with_batch_size(config.batch_size);
212        let event_queue = PriorityQueue::with_capacity(config.priority_queue_capacity);
213        let timer_wheel = Arc::new(TimerWheel::with_max_timers(config.max_timers));
214        let (command_tx, command_rx) = channel();
215
216        Self {
217            state: RuntimeState::Booting,
218            event_bus: Arc::new(EventBus::new()),
219            event_queue,
220            work_queue,
221            timer_wheel,
222            executor,
223            command_tx,
224            command_rx,
225            render_pending: false,
226            current_scope: None,
227        }
228    }
229
230    /// Create a runtime with a shared event bus.
231    ///
232    /// This is useful when the event bus is shared with other subsystems.
233    #[must_use]
234    pub fn with_event_bus(mut self, event_bus: Arc<EventBus>) -> Self {
235        self.event_bus = event_bus;
236        self
237    }
238
239    /// Boot the runtime, transitioning from `Booting` to `Running`.
240    ///
241    /// # Panics
242    ///
243    /// Panics if called when not in `Booting` state.
244    pub fn boot(&mut self) {
245        assert_eq!(self.state, RuntimeState::Booting, "boot() called when not in Booting state");
246        self.state = RuntimeState::Running;
247    }
248
249    /// Get the current runtime state.
250    #[inline]
251    #[must_use]
252    pub const fn state(&self) -> RuntimeState {
253        self.state
254    }
255
256    /// Get a reference to the event bus.
257    #[inline]
258    #[must_use]
259    pub const fn event_bus(&self) -> &Arc<EventBus> {
260        &self.event_bus
261    }
262
263    /// Get a clone of the command sender.
264    ///
265    /// The sender can be used from other threads to send commands.
266    #[must_use]
267    pub fn command_sender(&self) -> Sender<RuntimeCommand> {
268        self.command_tx.clone()
269    }
270
271    /// Process one tick of the event loop.
272    ///
273    /// This method:
274    /// 1. Processes runtime commands
275    /// 2. Processes expired timers (schedules their callbacks as tasks)
276    /// 3. Dispatches events from the priority queue
277    /// 4. Executes tasks from the work queue
278    /// 5. Processes queued async events
279    ///
280    /// Returns `true` if the runtime should continue, `false` if it should stop.
281    #[cfg_attr(coverage_nightly, coverage(off))]
282    pub fn tick(&mut self) -> bool {
283        // Process commands first
284        self.process_commands();
285
286        // Check if we should stop
287        if self.state.is_terminal() {
288            // Drain remaining work before stopping (graceful shutdown)
289            if self.state == RuntimeState::Stopping {
290                self.executor.drain();
291            }
292            return false;
293        }
294
295        // Only process work when running
296        if self.state.is_running() {
297            // Process timers FIRST - may schedule new work
298            let timer_tasks = self.timer_wheel.tick(std::time::Instant::now());
299            for task in timer_tasks {
300                self.work_queue.push(task);
301            }
302
303            // Dispatch events from priority queue
304            self.dispatch_events();
305
306            // Execute tasks from work queue
307            self.executor.tick();
308
309            // Process queued async events
310            let _ = self.event_bus.process_queue();
311        }
312
313        true
314    }
315
316    /// Process pending runtime commands.
317    #[cfg_attr(coverage_nightly, coverage(off))]
318    fn process_commands(&mut self) {
319        while let Ok(command) = self.command_rx.try_recv() {
320            match command {
321                RuntimeCommand::Shutdown => {
322                    if self.state == RuntimeState::Running {
323                        self.state = RuntimeState::Stopping;
324                    }
325                }
326                RuntimeCommand::Emergency => {
327                    self.state = RuntimeState::Emergency;
328                }
329                RuntimeCommand::ScheduleTask(task) => {
330                    if self.state.can_accept_work() {
331                        self.work_queue.push(task);
332                    }
333                }
334            }
335        }
336    }
337
338    /// Dispatch events from the priority queue to handlers.
339    #[cfg_attr(coverage_nightly, coverage(off))]
340    fn dispatch_events(&self) {
341        while let Some(event) = self.event_queue.pop() {
342            let _result = self.event_bus.dispatch(&event);
343            // Decrement scope if present
344            if let Some(ref scope) = self.current_scope
345                && scope.in_flight() > 0
346            {
347                scope.decrement();
348            }
349        }
350    }
351
352    /// Schedule a closure for deferred execution.
353    ///
354    /// Returns `false` if the queue is full or runtime is not accepting work.
355    pub fn schedule_work<F>(&self, work: F) -> bool
356    where
357        F: FnOnce() + Send + 'static,
358    {
359        if !self.state.can_accept_work() {
360            return false;
361        }
362        self.work_queue.push(Task::new(work))
363    }
364
365    /// Schedule a closure with specific priority.
366    ///
367    /// Returns `false` if the queue is full or runtime is not accepting work.
368    pub fn schedule_work_with_priority<F>(&self, priority: Priority, work: F) -> bool
369    where
370        F: FnOnce() + Send + 'static,
371    {
372        if !self.state.can_accept_work() {
373            return false;
374        }
375        self.work_queue.push(Task::with_priority(priority, work))
376    }
377
378    /// Schedule a pre-built task for execution.
379    ///
380    /// Returns `false` if the queue is full or runtime is not accepting work.
381    pub fn schedule_task(&self, task: Task) -> bool {
382        if !self.state.can_accept_work() {
383            return false;
384        }
385        self.work_queue.push(task)
386    }
387
388    /// Schedule work to execute after a delay (one-shot timer).
389    ///
390    /// The callback executes once after the delay passes, on the next tick
391    /// after the deadline. Returns a handle that cancels the timer when dropped.
392    ///
393    /// # Example
394    ///
395    /// ```
396    /// use reovim_kernel::api::v1::*;
397    /// use std::time::Duration;
398    ///
399    /// let mut runtime = Runtime::new();
400    /// runtime.boot();
401    ///
402    /// // Schedule work for 100ms from now
403    /// let handle = runtime.schedule_delayed(Duration::from_millis(100), || {
404    ///     println!("Delayed work executed!");
405    /// });
406    ///
407    /// // Timer will fire on the tick after 100ms passes
408    /// // Drop handle to cancel, or let it fire
409    /// ```
410    pub fn schedule_delayed<F>(&self, delay: Duration, work: F) -> TimerHandle
411    where
412        F: FnOnce() + Send + 'static,
413    {
414        self.timer_wheel
415            .schedule_oneshot(delay, Priority::NORMAL, work)
416    }
417
418    /// Schedule work to execute periodically.
419    ///
420    /// The callback executes repeatedly at the specified interval. Returns a
421    /// handle that cancels the timer when dropped.
422    ///
423    /// # Example
424    ///
425    /// ```
426    /// use reovim_kernel::api::v1::*;
427    /// use std::time::Duration;
428    ///
429    /// let mut runtime = Runtime::new();
430    /// runtime.boot();
431    ///
432    /// // Schedule periodic work every 50ms
433    /// let handle = runtime.schedule_periodic(Duration::from_millis(50), || {
434    ///     println!("Periodic work!");
435    /// });
436    ///
437    /// // Timer fires every 50ms until handle is dropped
438    /// ```
439    pub fn schedule_periodic<F>(&self, interval: Duration, work: F) -> TimerHandle
440    where
441        F: Fn() + Send + Sync + 'static,
442    {
443        self.timer_wheel
444            .schedule_periodic(interval, Priority::NORMAL, work)
445    }
446
447    /// Cancel a timer by ID.
448    ///
449    /// Returns `true` if the timer was found and cancelled, `false` if it
450    /// was already cancelled or has already fired.
451    ///
452    /// Note: Timers are automatically cancelled when their [`TimerHandle`] is
453    /// dropped, so explicit cancellation is rarely needed.
454    pub fn cancel_timer(&self, id: TimerId) -> bool {
455        self.timer_wheel.cancel(id)
456    }
457
458    /// Get a reference to the timer wheel.
459    ///
460    /// This allows external code to schedule timers directly with custom
461    /// configuration.
462    #[must_use]
463    pub const fn timer_wheel(&self) -> &Arc<TimerWheel> {
464        &self.timer_wheel
465    }
466
467    /// Queue an event for priority-ordered processing.
468    ///
469    /// Returns `false` if the queue is full.
470    pub fn queue_event(&self, event: DynEvent) -> bool {
471        self.event_queue.push(event)
472    }
473
474    /// Request a render on the next tick.
475    pub const fn request_render(&mut self) {
476        self.render_pending = true;
477    }
478
479    /// Check and clear the render pending flag.
480    ///
481    /// Returns `true` if a render was requested since the last call.
482    pub const fn take_render_pending(&mut self) -> bool {
483        let pending = self.render_pending;
484        self.render_pending = false;
485        pending
486    }
487
488    /// Check if a render is pending.
489    #[inline]
490    #[must_use]
491    pub const fn is_render_pending(&self) -> bool {
492        self.render_pending
493    }
494
495    /// Set the current event scope for lifecycle tracking.
496    pub fn set_scope(&mut self, scope: EventScope) {
497        self.current_scope = Some(scope);
498    }
499
500    /// Clear the current event scope.
501    pub fn clear_scope(&mut self) {
502        self.current_scope = None;
503    }
504
505    /// Get a reference to the current scope.
506    #[must_use]
507    pub const fn current_scope(&self) -> Option<&EventScope> {
508        self.current_scope.as_ref()
509    }
510
511    /// Request graceful shutdown.
512    ///
513    /// The runtime will finish processing current work before stopping.
514    pub fn shutdown(&mut self) {
515        if self.state == RuntimeState::Running {
516            self.state = RuntimeState::Stopping;
517        }
518    }
519
520    /// Request emergency stop.
521    ///
522    /// The runtime will stop immediately without draining work.
523    pub const fn emergency_stop(&mut self) {
524        self.state = RuntimeState::Emergency;
525    }
526
527    /// Check if the runtime is idle (no pending work).
528    #[must_use]
529    #[cfg_attr(coverage_nightly, coverage(off))]
530    pub fn is_idle(&self) -> bool {
531        self.event_queue.is_empty()
532            && self.work_queue.is_empty()
533            && self.event_bus.queue_is_empty()
534            && self.timer_wheel.pending_count() == 0
535    }
536
537    /// Get runtime statistics.
538    #[must_use]
539    pub fn stats(&self) -> RuntimeStats {
540        RuntimeStats {
541            state: self.state,
542            event_queue_len: self.event_queue.len(),
543            work_queue_len: self.work_queue.len(),
544            work_dropped: self.work_queue.dropped_count(),
545            tasks_executed: self.executor.executed_count(),
546            tasks_failed: self.executor.failed_count(),
547            active_timers: self.timer_wheel.pending_count(),
548            timers_dropped: self.timer_wheel.dropped_count(),
549        }
550    }
551
552    /// Get a reference to the work queue.
553    ///
554    /// This allows external code to schedule tasks directly.
555    #[must_use]
556    pub const fn work_queue(&self) -> &Arc<WorkQueue> {
557        &self.work_queue
558    }
559
560    /// Run until the runtime stops.
561    ///
562    /// This is a convenience method that calls `tick()` in a loop.
563    /// For more control, use `tick()` directly.
564    #[cfg_attr(coverage_nightly, coverage(off))]
565    pub fn run(&mut self) {
566        while self.tick() {}
567    }
568}
569
570impl Default for Runtime {
571    fn default() -> Self {
572        Self::new()
573    }
574}
575
576impl std::fmt::Debug for Runtime {
577    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
578        f.debug_struct("Runtime")
579            .field("state", &self.state)
580            .field("event_queue_len", &self.event_queue.len())
581            .field("work_queue_len", &self.work_queue.len())
582            .field("render_pending", &self.render_pending)
583            .field("has_scope", &self.current_scope.is_some())
584            .finish_non_exhaustive()
585    }
586}