use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use crate::driver::{Driver, DriverFactory, DriverType};
use crate::scheduler::{Scheduler, SchedulerConfig, SchedulerHandle};
use crate::time::{Duration, Instant};
thread_local! {
static CURRENT_HANDLE: std::cell::RefCell<Option<Handle>> = const { std::cell::RefCell::new(None) };
}
#[derive(Debug, Clone)]
pub struct RuntimeConfig {
pub scheduler: SchedulerConfig,
pub driver_type: DriverType,
pub driver_io: crate::driver::DriverConfig,
pub enable_parking: bool,
pub park_timeout: Duration,
}
impl Default for RuntimeConfig {
fn default() -> Self {
Self {
scheduler: SchedulerConfig::default(),
driver_type: DriverType::Auto,
driver_io: crate::driver::DriverConfig::default(),
enable_parking: true,
park_timeout: Duration::from_millis(100),
}
}
}
pub struct RuntimeBuilder {
config: RuntimeConfig,
}
impl RuntimeBuilder {
pub fn new() -> Self {
Self {
config: RuntimeConfig::default(),
}
}
pub fn worker_threads(mut self, count: usize) -> Self {
self.config.scheduler.queue_size = count * 256;
self.config.scheduler.thread_name = "hiver-worker".to_string();
self
}
pub fn queue_size(mut self, size: usize) -> Self {
self.config.scheduler.queue_size = size;
self
}
pub fn thread_name(mut self, name: impl Into<String>) -> Self {
self.config.scheduler.thread_name = name.into();
self
}
pub fn driver_type(mut self, driver_type: DriverType) -> Self {
self.config.driver_type = driver_type;
self
}
pub fn io_entries(mut self, entries: u32) -> Self {
self.config.driver_io.entries = entries;
self
}
pub fn enable_parking(mut self, enable: bool) -> Self {
self.config.enable_parking = enable;
self
}
pub fn park_timeout(mut self, timeout: Duration) -> Self {
self.config.park_timeout = timeout;
self
}
pub fn build(self) -> io::Result<Runtime> {
Runtime::with_config(self.config)
}
}
impl Default for RuntimeBuilder {
fn default() -> Self {
Self::new()
}
}
pub struct Runtime {
scheduler: Scheduler,
driver: Arc<dyn Driver>,
config: RuntimeConfig,
main_waker: Option<Waker>,
last_timer_advance: Instant,
}
impl Runtime {
pub fn new() -> io::Result<Self> {
Self::with_config(RuntimeConfig::default())
}
pub fn builder() -> RuntimeBuilder {
RuntimeBuilder::new()
}
pub fn with_config(config: RuntimeConfig) -> io::Result<Self> {
let driver =
DriverFactory::create_with_config(config.driver_type, config.driver_io.clone())?;
let scheduler = Scheduler::with_config_and_driver(&config.scheduler, driver.clone())?;
Ok(Self {
scheduler,
driver,
config,
main_waker: None,
last_timer_advance: Instant::now(),
})
}
pub fn block_on<F: Future<Output = ()>>(&mut self, future: F) -> io::Result<()> {
let handle = Handle {
scheduler_handle: self.scheduler.handle(),
};
Handle::set_current(Some(handle));
let mut future = Box::pin(future);
let handle = self.scheduler.handle();
let waker = handle.waker();
let mut context = Context::from_waker(&waker);
self.main_waker = Some(waker.clone());
let result = loop {
match Pin::new(&mut future).poll(&mut context) {
Poll::Ready(()) => {
let _ = self.flush_events();
break Ok(());
},
Poll::Pending => {
self.run_once()?;
},
}
};
Handle::set_current(None);
result
}
fn run_once(&mut self) -> io::Result<()> {
let _ = self.driver.submit();
let timeout = if self.config.enable_parking {
Some(self.config.park_timeout)
} else {
None
};
if let Some(to) = timeout {
let (_events, timed_out) = self.driver.wait_timeout(to)?;
if timed_out {
}
} else {
let _events = self.driver.wait()?;
}
self.process_completions();
self.advance_timers();
Ok(())
}
fn process_completions(&mut self) {
while let Some(completion) = self.driver.get_completion() {
if let Some(waker) = self.scheduler.get_task_waker(completion.user_data) {
waker.wake();
}
self.driver.advance_completion();
}
}
fn advance_timers(&mut self) {
use crate::time::global_timer;
let now = Instant::now();
let elapsed = now.duration_since(self.last_timer_advance);
let ticks_to_advance = elapsed.as_millis() as u64;
if ticks_to_advance > 0 {
let _expired = global_timer().advance(ticks_to_advance);
self.last_timer_advance = now;
}
}
fn flush_events(&mut self) -> io::Result<()> {
let _ = self.driver.submit();
let _ = self.driver.wait_timeout(Duration::from_millis(0))?;
self.process_completions();
Ok(())
}
}
#[derive(Clone)]
pub struct Handle {
scheduler_handle: SchedulerHandle,
}
impl Handle {
#[allow(clippy::expect_used)]
pub fn current() -> Self {
Self::try_current().expect("Handle::current() called outside of a runtime context")
}
pub fn try_current() -> Option<Self> {
CURRENT_HANDLE.with(|h| h.borrow().clone())
}
fn set_current(handle: Option<Handle>) {
CURRENT_HANDLE.with(|h| *h.borrow_mut() = handle);
}
pub fn scheduler(&self) -> &SchedulerHandle {
&self.scheduler_handle
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_runtime_config_default() {
let config = RuntimeConfig::default();
assert_eq!(config.scheduler.queue_size, 256);
assert!(config.enable_parking);
assert_eq!(config.park_timeout.as_millis(), 100);
}
#[test]
fn test_runtime_builder() {
let builder = RuntimeBuilder::new()
.worker_threads(4)
.queue_size(512)
.thread_name("test-worker")
.enable_parking(false);
assert_eq!(builder.config.scheduler.queue_size, 512);
assert_eq!(builder.config.scheduler.thread_name, "test-worker");
assert!(!builder.config.enable_parking);
}
#[test]
fn test_runtime_builder_driver_config() {
let builder = RuntimeBuilder::new()
.driver_type(DriverType::Auto)
.io_entries(512)
.park_timeout(Duration::from_millis(50));
assert_eq!(builder.config.driver_io.entries, 512);
assert_eq!(builder.config.park_timeout.as_millis(), 50);
}
#[test]
fn test_runtime_creation() {
let runtime = Runtime::new();
#[cfg(any(
target_os = "linux",
target_os = "macos",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly"
))]
{
assert!(runtime.is_ok());
}
}
#[test]
fn test_block_on_simple() {
let mut runtime = Runtime::new().unwrap();
let result = runtime.block_on(async {});
assert!(result.is_ok());
}
#[test]
fn test_spawn_executes_through_scheduler() {
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
let mut runtime = Runtime::new().unwrap();
let counter = Arc::new(AtomicI32::new(0));
let counter_clone = counter.clone();
runtime
.block_on(async move {
let handle = crate::task::spawn(async move {
counter_clone.store(42, Ordering::SeqCst);
});
let _ = handle.wait().await;
})
.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 42);
}
#[test]
fn test_spawn_returns_value() {
let mut runtime = Runtime::new().unwrap();
runtime
.block_on(async {
let handle = crate::task::spawn(async { 42i32 });
let result = handle.wait().await.unwrap();
assert_eq!(result, 42);
})
.unwrap();
}
#[test]
fn test_multiple_spawns() {
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
let mut runtime = Runtime::new().unwrap();
let counter = Arc::new(AtomicI32::new(0));
runtime
.block_on(async {
let mut handles = vec![];
for _ in 0..10 {
let c = counter.clone();
handles.push(crate::task::spawn(async move {
c.fetch_add(1, Ordering::SeqCst);
}));
}
for h in handles {
let _ = h.wait().await;
}
})
.unwrap();
assert_eq!(counter.load(Ordering::SeqCst), 10);
}
#[test]
fn test_spawn_with_async_computation() {
let mut runtime = Runtime::new().unwrap();
runtime
.block_on(async {
let h1 = crate::task::spawn(async { 1i32 });
let h2 = crate::task::spawn(async { 2i32 });
let h3 = crate::task::spawn(async { 3i32 });
let sum =
h1.wait().await.unwrap() + h2.wait().await.unwrap() + h3.wait().await.unwrap();
assert_eq!(sum, 6);
})
.unwrap();
}
#[test]
fn test_spawn_join_handle_id() {
let mut runtime = Runtime::new().unwrap();
runtime
.block_on(async {
let h1 = crate::task::spawn(async { 1i32 });
let h2 = crate::task::spawn(async { 2i32 });
assert_ne!(h1.id(), 0);
assert_ne!(h2.id(), 0);
assert_ne!(h1.id(), h2.id());
let _ = h1.wait().await;
let _ = h2.wait().await;
})
.unwrap();
}
#[test]
fn test_spawn_join_handle_is_finished() {
let mut runtime = Runtime::new().unwrap();
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
let flag = Arc::new(AtomicBool::new(false));
let flag_clone = flag.clone();
runtime
.block_on(async move {
let handle = crate::task::spawn(async move {
flag_clone.store(true, Ordering::SeqCst);
});
let _ = handle.wait().await;
assert!(flag.load(Ordering::SeqCst));
})
.unwrap();
}
#[test]
fn test_spawn_string_return() {
let mut runtime = Runtime::new().unwrap();
runtime
.block_on(async {
let handle = crate::task::spawn(async { String::from("hello") });
let result = handle.wait().await.unwrap();
assert_eq!(result, "hello");
})
.unwrap();
}
#[test]
fn test_spawn_vec_return() {
let mut runtime = Runtime::new().unwrap();
runtime
.block_on(async {
let handle = crate::task::spawn(async { vec![1, 2, 3] });
let result = handle.wait().await.unwrap();
assert_eq!(result, vec![1, 2, 3]);
})
.unwrap();
}
#[test]
fn test_spawn_tuple_return() {
let mut runtime = Runtime::new().unwrap();
runtime
.block_on(async {
let handle = crate::task::spawn(async { (42i32, true, "test".to_string()) });
let result = handle.wait().await.unwrap();
assert_eq!(result, (42, true, "test".to_string()));
})
.unwrap();
}
#[test]
fn test_spawn_unit_return() {
let mut runtime = Runtime::new().unwrap();
runtime
.block_on(async {
let handle: crate::task::JoinHandle<()> = crate::task::spawn(async {});
let result = handle.wait().await;
assert!(result.is_ok());
})
.unwrap();
}
#[test]
fn test_spawn_option_return() {
let mut runtime = Runtime::new().unwrap();
runtime
.block_on(async {
let handle = crate::task::spawn(async { Some(42i32) });
let result = handle.wait().await.unwrap();
assert_eq!(result, Some(42));
})
.unwrap();
}
#[test]
fn test_nested_spawn() {
let mut runtime = Runtime::new().unwrap();
runtime
.block_on(async {
let handle = crate::task::spawn(async {
let inner = crate::task::spawn(async { 10i32 });
inner.wait().await.unwrap()
});
let result = handle.wait().await.unwrap();
assert_eq!(result, 10);
})
.unwrap();
}
#[test]
fn test_handle_current_and_try_current() {
let mut runtime = Runtime::new().unwrap();
runtime
.block_on(async {
let handle = Handle::current();
assert!(Handle::try_current().is_some());
let _scheduler = handle.scheduler();
})
.unwrap();
assert!(Handle::try_current().is_none());
}
#[test]
#[should_panic(expected = "outside of a runtime context")]
fn test_handle_current_panics_outside_runtime() {
let _ = Handle::current();
}
#[test]
fn test_block_on_with_config() {
let config = RuntimeConfig {
park_timeout: Duration::from_millis(10),
..RuntimeConfig::default()
};
let mut runtime = Runtime::with_config(config).unwrap();
let result = runtime.block_on(async {});
assert!(result.is_ok());
}
}