reovim-kernel 0.14.4

Core kernel mechanisms for reovim (Linux kernel/ equivalent)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
//! Runtime event loop coordinator.
//!
//! Linux equivalent: Main scheduler in `kernel/sched/core.c`
//!
//! The [`Runtime`] is the central coordinator for event processing and task
//! execution. It manages the event loop lifecycle, processes events from the
//! priority queue, and executes deferred tasks.
//!
//! # Design Philosophy
//!
//! - **Pure coordination**: Runtime doesn't own business logic, just orchestrates
//! - **Sync execution**: All processing happens synchronously (async is driver responsibility)
//! - **Panic safety**: Tasks that panic are caught and don't crash the runtime
//! - **Graceful shutdown**: Supports clean shutdown with work draining
//!
//! # Example
//!
//! ```
//! use reovim_kernel::api::v1::*;
//!
//! let mut runtime = Runtime::new();
//! runtime.boot();
//!
//! // Schedule some work
//! runtime.schedule_work(|| println!("Deferred work!"));
//!
//! // Process pending work and check if idle
//! runtime.tick();
//! assert!(runtime.is_idle());
//!
//! // Request shutdown
//! runtime.shutdown();
//! assert_eq!(runtime.state(), RuntimeState::Stopping);
//! ```

use std::{sync::Arc, time::Duration};

use {
    super::{
        executor::Executor,
        priority::PriorityQueue,
        state::RuntimeState,
        task::{Priority, Task},
        timer::{DEFAULT_MAX_TIMERS, TimerHandle, TimerId, TimerWheel},
        work_queue::WorkQueue,
    },
    crate::ipc::{DynEvent, EventBus, EventScope, Receiver, Sender, channel},
};

/// Default work queue capacity.
pub const DEFAULT_WORK_QUEUE_CAPACITY: usize = 1024;

/// Default priority queue capacity.
pub const DEFAULT_PRIORITY_QUEUE_CAPACITY: usize = 256;

/// Default batch size for task execution.
pub const DEFAULT_BATCH_SIZE: usize = 16;

/// Runtime configuration.
///
/// Used with [`Runtime::with_config()`] to customize runtime behavior.
///
/// # Example
///
/// ```
/// use reovim_kernel::api::v1::*;
///
/// let config = RuntimeConfig {
///     work_queue_capacity: 2048,
///     priority_queue_capacity: 512,
///     batch_size: 32,
///     max_timers: 512,
/// };
///
/// let runtime = Runtime::with_config(config);
/// ```
#[derive(Debug, Clone, Copy)]
pub struct RuntimeConfig {
    /// Maximum tasks in the work queue.
    pub work_queue_capacity: usize,

    /// Maximum events in the priority queue.
    pub priority_queue_capacity: usize,

    /// Maximum tasks processed per tick.
    pub batch_size: usize,

    /// Maximum concurrent timers.
    pub max_timers: usize,
}

impl Default for RuntimeConfig {
    fn default() -> Self {
        Self {
            work_queue_capacity: DEFAULT_WORK_QUEUE_CAPACITY,
            priority_queue_capacity: DEFAULT_PRIORITY_QUEUE_CAPACITY,
            batch_size: DEFAULT_BATCH_SIZE,
            max_timers: DEFAULT_MAX_TIMERS,
        }
    }
}

/// Commands that can be sent to the runtime.
///
/// Used for external control of the runtime lifecycle.
#[derive(Debug)]
pub enum RuntimeCommand {
    /// Request graceful shutdown.
    Shutdown,

    /// Request emergency stop (panic recovery).
    Emergency,

    /// Schedule a task for execution.
    ScheduleTask(Task),
}

/// Runtime execution statistics.
///
/// Provides insight into runtime operation for monitoring and debugging.
#[derive(Debug, Clone, Copy, Default)]
pub struct RuntimeStats {
    /// Current runtime state.
    pub state: RuntimeState,

    /// Number of events in the priority queue.
    pub event_queue_len: usize,

    /// Number of tasks in the work queue.
    pub work_queue_len: usize,

    /// Number of tasks dropped due to overflow.
    pub work_dropped: usize,

    /// Total tasks successfully executed.
    pub tasks_executed: u64,

    /// Total tasks that failed (error or panic).
    pub tasks_failed: u64,

    /// Number of active (pending) timers.
    pub active_timers: usize,

    /// Number of timers dropped due to capacity limit.
    pub timers_dropped: u64,
}

/// Runtime event loop coordinator.
///
/// The `Runtime` manages the main event loop, coordinating between:
/// - Event processing (via [`PriorityQueue`] and [`EventBus`])
/// - Task execution (via [`WorkQueue`] and [`Executor`])
/// - Lifecycle management (boot, run, shutdown)
///
/// # Thread Safety
///
/// The runtime itself is `Send` but not `Sync` - it should be owned by a
/// single thread that drives the event loop. However, the command channel
/// and work queue can be accessed from other threads for scheduling.
///
/// # State Transitions
///
/// ```text
/// Booting → Running → Stopping
///    ↓         ↓          ↓
///    └─────────┴──────────┴→ Emergency
/// ```
pub struct Runtime {
    /// Current lifecycle state.
    state: RuntimeState,

    /// Event bus for pub/sub communication.
    event_bus: Arc<EventBus>,

    /// Priority queue for ordered event processing.
    event_queue: PriorityQueue,

    /// Work queue for deferred tasks.
    work_queue: Arc<WorkQueue>,

    /// Timer wheel for delayed/periodic work.
    timer_wheel: Arc<TimerWheel>,

    /// Task executor with panic handling.
    executor: Executor,

    /// Sender for runtime commands.
    command_tx: Sender<RuntimeCommand>,

    /// Receiver for runtime commands.
    command_rx: Receiver<RuntimeCommand>,

    /// Whether a render is pending.
    render_pending: bool,

    /// Current event scope for lifecycle tracking.
    current_scope: Option<EventScope>,
}

impl Runtime {
    /// Create a new runtime with default configuration.
    #[must_use]
    pub fn new() -> Self {
        Self::with_config(RuntimeConfig::default())
    }

    /// Create a runtime with custom configuration.
    #[must_use]
    pub fn with_config(config: RuntimeConfig) -> Self {
        let work_queue = Arc::new(WorkQueue::with_capacity(config.work_queue_capacity));
        let executor = Executor::new(Arc::clone(&work_queue)).with_batch_size(config.batch_size);
        let event_queue = PriorityQueue::with_capacity(config.priority_queue_capacity);
        let timer_wheel = Arc::new(TimerWheel::with_max_timers(config.max_timers));
        let (command_tx, command_rx) = channel();

        Self {
            state: RuntimeState::Booting,
            event_bus: Arc::new(EventBus::new()),
            event_queue,
            work_queue,
            timer_wheel,
            executor,
            command_tx,
            command_rx,
            render_pending: false,
            current_scope: None,
        }
    }

    /// Create a runtime with a shared event bus.
    ///
    /// This is useful when the event bus is shared with other subsystems.
    #[must_use]
    pub fn with_event_bus(mut self, event_bus: Arc<EventBus>) -> Self {
        self.event_bus = event_bus;
        self
    }

    /// Boot the runtime, transitioning from `Booting` to `Running`.
    ///
    /// # Panics
    ///
    /// Panics if called when not in `Booting` state.
    pub fn boot(&mut self) {
        assert_eq!(self.state, RuntimeState::Booting, "boot() called when not in Booting state");
        self.state = RuntimeState::Running;
    }

    /// Get the current runtime state.
    #[inline]
    #[must_use]
    pub const fn state(&self) -> RuntimeState {
        self.state
    }

    /// Get a reference to the event bus.
    #[inline]
    #[must_use]
    pub const fn event_bus(&self) -> &Arc<EventBus> {
        &self.event_bus
    }

    /// Get a clone of the command sender.
    ///
    /// The sender can be used from other threads to send commands.
    #[must_use]
    pub fn command_sender(&self) -> Sender<RuntimeCommand> {
        self.command_tx.clone()
    }

    /// Process one tick of the event loop.
    ///
    /// This method:
    /// 1. Processes runtime commands
    /// 2. Processes expired timers (schedules their callbacks as tasks)
    /// 3. Dispatches events from the priority queue
    /// 4. Executes tasks from the work queue
    /// 5. Processes queued async events
    ///
    /// Returns `true` if the runtime should continue, `false` if it should stop.
    #[cfg_attr(coverage_nightly, coverage(off))]
    pub fn tick(&mut self) -> bool {
        // Process commands first
        self.process_commands();

        // Check if we should stop
        if self.state.is_terminal() {
            // Drain remaining work before stopping (graceful shutdown)
            if self.state == RuntimeState::Stopping {
                self.executor.drain();
            }
            return false;
        }

        // Only process work when running
        if self.state.is_running() {
            // Process timers FIRST - may schedule new work
            let timer_tasks = self.timer_wheel.tick(std::time::Instant::now());
            for task in timer_tasks {
                self.work_queue.push(task);
            }

            // Dispatch events from priority queue
            self.dispatch_events();

            // Execute tasks from work queue
            self.executor.tick();

            // Process queued async events
            let _ = self.event_bus.process_queue();
        }

        true
    }

    /// Process pending runtime commands.
    #[cfg_attr(coverage_nightly, coverage(off))]
    fn process_commands(&mut self) {
        while let Ok(command) = self.command_rx.try_recv() {
            match command {
                RuntimeCommand::Shutdown => {
                    if self.state == RuntimeState::Running {
                        self.state = RuntimeState::Stopping;
                    }
                }
                RuntimeCommand::Emergency => {
                    self.state = RuntimeState::Emergency;
                }
                RuntimeCommand::ScheduleTask(task) => {
                    if self.state.can_accept_work() {
                        self.work_queue.push(task);
                    }
                }
            }
        }
    }

    /// Dispatch events from the priority queue to handlers.
    #[cfg_attr(coverage_nightly, coverage(off))]
    fn dispatch_events(&self) {
        while let Some(event) = self.event_queue.pop() {
            let _result = self.event_bus.dispatch(&event);
            // Decrement scope if present
            if let Some(ref scope) = self.current_scope
                && scope.in_flight() > 0
            {
                scope.decrement();
            }
        }
    }

    /// Schedule a closure for deferred execution.
    ///
    /// Returns `false` if the queue is full or runtime is not accepting work.
    pub fn schedule_work<F>(&self, work: F) -> bool
    where
        F: FnOnce() + Send + 'static,
    {
        if !self.state.can_accept_work() {
            return false;
        }
        self.work_queue.push(Task::new(work))
    }

    /// Schedule a closure with specific priority.
    ///
    /// Returns `false` if the queue is full or runtime is not accepting work.
    pub fn schedule_work_with_priority<F>(&self, priority: Priority, work: F) -> bool
    where
        F: FnOnce() + Send + 'static,
    {
        if !self.state.can_accept_work() {
            return false;
        }
        self.work_queue.push(Task::with_priority(priority, work))
    }

    /// Schedule a pre-built task for execution.
    ///
    /// Returns `false` if the queue is full or runtime is not accepting work.
    pub fn schedule_task(&self, task: Task) -> bool {
        if !self.state.can_accept_work() {
            return false;
        }
        self.work_queue.push(task)
    }

    /// Schedule work to execute after a delay (one-shot timer).
    ///
    /// The callback executes once after the delay passes, on the next tick
    /// after the deadline. Returns a handle that cancels the timer when dropped.
    ///
    /// # Example
    ///
    /// ```
    /// use reovim_kernel::api::v1::*;
    /// use std::time::Duration;
    ///
    /// let mut runtime = Runtime::new();
    /// runtime.boot();
    ///
    /// // Schedule work for 100ms from now
    /// let handle = runtime.schedule_delayed(Duration::from_millis(100), || {
    ///     println!("Delayed work executed!");
    /// });
    ///
    /// // Timer will fire on the tick after 100ms passes
    /// // Drop handle to cancel, or let it fire
    /// ```
    pub fn schedule_delayed<F>(&self, delay: Duration, work: F) -> TimerHandle
    where
        F: FnOnce() + Send + 'static,
    {
        self.timer_wheel
            .schedule_oneshot(delay, Priority::NORMAL, work)
    }

    /// Schedule work to execute periodically.
    ///
    /// The callback executes repeatedly at the specified interval. Returns a
    /// handle that cancels the timer when dropped.
    ///
    /// # Example
    ///
    /// ```
    /// use reovim_kernel::api::v1::*;
    /// use std::time::Duration;
    ///
    /// let mut runtime = Runtime::new();
    /// runtime.boot();
    ///
    /// // Schedule periodic work every 50ms
    /// let handle = runtime.schedule_periodic(Duration::from_millis(50), || {
    ///     println!("Periodic work!");
    /// });
    ///
    /// // Timer fires every 50ms until handle is dropped
    /// ```
    pub fn schedule_periodic<F>(&self, interval: Duration, work: F) -> TimerHandle
    where
        F: Fn() + Send + Sync + 'static,
    {
        self.timer_wheel
            .schedule_periodic(interval, Priority::NORMAL, work)
    }

    /// Cancel a timer by ID.
    ///
    /// Returns `true` if the timer was found and cancelled, `false` if it
    /// was already cancelled or has already fired.
    ///
    /// Note: Timers are automatically cancelled when their [`TimerHandle`] is
    /// dropped, so explicit cancellation is rarely needed.
    pub fn cancel_timer(&self, id: TimerId) -> bool {
        self.timer_wheel.cancel(id)
    }

    /// Get a reference to the timer wheel.
    ///
    /// This allows external code to schedule timers directly with custom
    /// configuration.
    #[must_use]
    pub const fn timer_wheel(&self) -> &Arc<TimerWheel> {
        &self.timer_wheel
    }

    /// Queue an event for priority-ordered processing.
    ///
    /// Returns `false` if the queue is full.
    pub fn queue_event(&self, event: DynEvent) -> bool {
        self.event_queue.push(event)
    }

    /// Request a render on the next tick.
    pub const fn request_render(&mut self) {
        self.render_pending = true;
    }

    /// Check and clear the render pending flag.
    ///
    /// Returns `true` if a render was requested since the last call.
    pub const fn take_render_pending(&mut self) -> bool {
        let pending = self.render_pending;
        self.render_pending = false;
        pending
    }

    /// Check if a render is pending.
    #[inline]
    #[must_use]
    pub const fn is_render_pending(&self) -> bool {
        self.render_pending
    }

    /// Set the current event scope for lifecycle tracking.
    pub fn set_scope(&mut self, scope: EventScope) {
        self.current_scope = Some(scope);
    }

    /// Clear the current event scope.
    pub fn clear_scope(&mut self) {
        self.current_scope = None;
    }

    /// Get a reference to the current scope.
    #[must_use]
    pub const fn current_scope(&self) -> Option<&EventScope> {
        self.current_scope.as_ref()
    }

    /// Request graceful shutdown.
    ///
    /// The runtime will finish processing current work before stopping.
    pub fn shutdown(&mut self) {
        if self.state == RuntimeState::Running {
            self.state = RuntimeState::Stopping;
        }
    }

    /// Request emergency stop.
    ///
    /// The runtime will stop immediately without draining work.
    pub const fn emergency_stop(&mut self) {
        self.state = RuntimeState::Emergency;
    }

    /// Check if the runtime is idle (no pending work).
    #[must_use]
    #[cfg_attr(coverage_nightly, coverage(off))]
    pub fn is_idle(&self) -> bool {
        self.event_queue.is_empty()
            && self.work_queue.is_empty()
            && self.event_bus.queue_is_empty()
            && self.timer_wheel.pending_count() == 0
    }

    /// Get runtime statistics.
    #[must_use]
    pub fn stats(&self) -> RuntimeStats {
        RuntimeStats {
            state: self.state,
            event_queue_len: self.event_queue.len(),
            work_queue_len: self.work_queue.len(),
            work_dropped: self.work_queue.dropped_count(),
            tasks_executed: self.executor.executed_count(),
            tasks_failed: self.executor.failed_count(),
            active_timers: self.timer_wheel.pending_count(),
            timers_dropped: self.timer_wheel.dropped_count(),
        }
    }

    /// Get a reference to the work queue.
    ///
    /// This allows external code to schedule tasks directly.
    #[must_use]
    pub const fn work_queue(&self) -> &Arc<WorkQueue> {
        &self.work_queue
    }

    /// Run until the runtime stops.
    ///
    /// This is a convenience method that calls `tick()` in a loop.
    /// For more control, use `tick()` directly.
    #[cfg_attr(coverage_nightly, coverage(off))]
    pub fn run(&mut self) {
        while self.tick() {}
    }
}

impl Default for Runtime {
    fn default() -> Self {
        Self::new()
    }
}

impl std::fmt::Debug for Runtime {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Runtime")
            .field("state", &self.state)
            .field("event_queue_len", &self.event_queue.len())
            .field("work_queue_len", &self.work_queue.len())
            .field("render_pending", &self.render_pending)
            .field("has_scope", &self.current_scope.is_some())
            .finish_non_exhaustive()
    }
}