use std::{sync::Arc, time::Duration};
use {
super::{
executor::Executor,
priority::PriorityQueue,
state::RuntimeState,
task::{Priority, Task},
timer::{DEFAULT_MAX_TIMERS, TimerHandle, TimerId, TimerWheel},
work_queue::WorkQueue,
},
crate::ipc::{DynEvent, EventBus, EventScope, Receiver, Sender, channel},
};
pub const DEFAULT_WORK_QUEUE_CAPACITY: usize = 1024;
pub const DEFAULT_PRIORITY_QUEUE_CAPACITY: usize = 256;
pub const DEFAULT_BATCH_SIZE: usize = 16;
#[derive(Debug, Clone, Copy)]
pub struct RuntimeConfig {
pub work_queue_capacity: usize,
pub priority_queue_capacity: usize,
pub batch_size: usize,
pub max_timers: usize,
}
impl Default for RuntimeConfig {
fn default() -> Self {
Self {
work_queue_capacity: DEFAULT_WORK_QUEUE_CAPACITY,
priority_queue_capacity: DEFAULT_PRIORITY_QUEUE_CAPACITY,
batch_size: DEFAULT_BATCH_SIZE,
max_timers: DEFAULT_MAX_TIMERS,
}
}
}
#[derive(Debug)]
pub enum RuntimeCommand {
Shutdown,
Emergency,
ScheduleTask(Task),
}
#[derive(Debug, Clone, Copy, Default)]
pub struct RuntimeStats {
pub state: RuntimeState,
pub event_queue_len: usize,
pub work_queue_len: usize,
pub work_dropped: usize,
pub tasks_executed: u64,
pub tasks_failed: u64,
pub active_timers: usize,
pub timers_dropped: u64,
}
pub struct Runtime {
state: RuntimeState,
event_bus: Arc<EventBus>,
event_queue: PriorityQueue,
work_queue: Arc<WorkQueue>,
timer_wheel: Arc<TimerWheel>,
executor: Executor,
command_tx: Sender<RuntimeCommand>,
command_rx: Receiver<RuntimeCommand>,
render_pending: bool,
current_scope: Option<EventScope>,
}
impl Runtime {
#[must_use]
pub fn new() -> Self {
Self::with_config(RuntimeConfig::default())
}
#[must_use]
pub fn with_config(config: RuntimeConfig) -> Self {
let work_queue = Arc::new(WorkQueue::with_capacity(config.work_queue_capacity));
let executor = Executor::new(Arc::clone(&work_queue)).with_batch_size(config.batch_size);
let event_queue = PriorityQueue::with_capacity(config.priority_queue_capacity);
let timer_wheel = Arc::new(TimerWheel::with_max_timers(config.max_timers));
let (command_tx, command_rx) = channel();
Self {
state: RuntimeState::Booting,
event_bus: Arc::new(EventBus::new()),
event_queue,
work_queue,
timer_wheel,
executor,
command_tx,
command_rx,
render_pending: false,
current_scope: None,
}
}
#[must_use]
pub fn with_event_bus(mut self, event_bus: Arc<EventBus>) -> Self {
self.event_bus = event_bus;
self
}
pub fn boot(&mut self) {
assert_eq!(self.state, RuntimeState::Booting, "boot() called when not in Booting state");
self.state = RuntimeState::Running;
}
#[inline]
#[must_use]
pub const fn state(&self) -> RuntimeState {
self.state
}
#[inline]
#[must_use]
pub const fn event_bus(&self) -> &Arc<EventBus> {
&self.event_bus
}
#[must_use]
pub fn command_sender(&self) -> Sender<RuntimeCommand> {
self.command_tx.clone()
}
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn tick(&mut self) -> bool {
self.process_commands();
if self.state.is_terminal() {
if self.state == RuntimeState::Stopping {
self.executor.drain();
}
return false;
}
if self.state.is_running() {
let timer_tasks = self.timer_wheel.tick(std::time::Instant::now());
for task in timer_tasks {
self.work_queue.push(task);
}
self.dispatch_events();
self.executor.tick();
let _ = self.event_bus.process_queue();
}
true
}
#[cfg_attr(coverage_nightly, coverage(off))]
fn process_commands(&mut self) {
while let Ok(command) = self.command_rx.try_recv() {
match command {
RuntimeCommand::Shutdown => {
if self.state == RuntimeState::Running {
self.state = RuntimeState::Stopping;
}
}
RuntimeCommand::Emergency => {
self.state = RuntimeState::Emergency;
}
RuntimeCommand::ScheduleTask(task) => {
if self.state.can_accept_work() {
self.work_queue.push(task);
}
}
}
}
}
#[cfg_attr(coverage_nightly, coverage(off))]
fn dispatch_events(&self) {
while let Some(event) = self.event_queue.pop() {
let _result = self.event_bus.dispatch(&event);
if let Some(ref scope) = self.current_scope
&& scope.in_flight() > 0
{
scope.decrement();
}
}
}
pub fn schedule_work<F>(&self, work: F) -> bool
where
F: FnOnce() + Send + 'static,
{
if !self.state.can_accept_work() {
return false;
}
self.work_queue.push(Task::new(work))
}
pub fn schedule_work_with_priority<F>(&self, priority: Priority, work: F) -> bool
where
F: FnOnce() + Send + 'static,
{
if !self.state.can_accept_work() {
return false;
}
self.work_queue.push(Task::with_priority(priority, work))
}
pub fn schedule_task(&self, task: Task) -> bool {
if !self.state.can_accept_work() {
return false;
}
self.work_queue.push(task)
}
pub fn schedule_delayed<F>(&self, delay: Duration, work: F) -> TimerHandle
where
F: FnOnce() + Send + 'static,
{
self.timer_wheel
.schedule_oneshot(delay, Priority::NORMAL, work)
}
pub fn schedule_periodic<F>(&self, interval: Duration, work: F) -> TimerHandle
where
F: Fn() + Send + Sync + 'static,
{
self.timer_wheel
.schedule_periodic(interval, Priority::NORMAL, work)
}
pub fn cancel_timer(&self, id: TimerId) -> bool {
self.timer_wheel.cancel(id)
}
#[must_use]
pub const fn timer_wheel(&self) -> &Arc<TimerWheel> {
&self.timer_wheel
}
pub fn queue_event(&self, event: DynEvent) -> bool {
self.event_queue.push(event)
}
pub const fn request_render(&mut self) {
self.render_pending = true;
}
pub const fn take_render_pending(&mut self) -> bool {
let pending = self.render_pending;
self.render_pending = false;
pending
}
#[inline]
#[must_use]
pub const fn is_render_pending(&self) -> bool {
self.render_pending
}
pub fn set_scope(&mut self, scope: EventScope) {
self.current_scope = Some(scope);
}
pub fn clear_scope(&mut self) {
self.current_scope = None;
}
#[must_use]
pub const fn current_scope(&self) -> Option<&EventScope> {
self.current_scope.as_ref()
}
pub fn shutdown(&mut self) {
if self.state == RuntimeState::Running {
self.state = RuntimeState::Stopping;
}
}
pub const fn emergency_stop(&mut self) {
self.state = RuntimeState::Emergency;
}
#[must_use]
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn is_idle(&self) -> bool {
self.event_queue.is_empty()
&& self.work_queue.is_empty()
&& self.event_bus.queue_is_empty()
&& self.timer_wheel.pending_count() == 0
}
#[must_use]
pub fn stats(&self) -> RuntimeStats {
RuntimeStats {
state: self.state,
event_queue_len: self.event_queue.len(),
work_queue_len: self.work_queue.len(),
work_dropped: self.work_queue.dropped_count(),
tasks_executed: self.executor.executed_count(),
tasks_failed: self.executor.failed_count(),
active_timers: self.timer_wheel.pending_count(),
timers_dropped: self.timer_wheel.dropped_count(),
}
}
#[must_use]
pub const fn work_queue(&self) -> &Arc<WorkQueue> {
&self.work_queue
}
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn run(&mut self) {
while self.tick() {}
}
}
impl Default for Runtime {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for Runtime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Runtime")
.field("state", &self.state)
.field("event_queue_len", &self.event_queue.len())
.field("work_queue_len", &self.work_queue.len())
.field("render_pending", &self.render_pending)
.field("has_scope", &self.current_scope.is_some())
.finish_non_exhaustive()
}
}