1use std::future::Future;
26use std::io;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::task::{Context, Poll, Waker};
30
31use crate::driver::{Driver, DriverFactory, DriverType};
32use crate::scheduler::{Scheduler, SchedulerConfig, SchedulerHandle};
33use crate::time::{Duration, Instant};
34
35thread_local! {
36 static CURRENT_HANDLE: std::cell::RefCell<Option<Handle>> = const { std::cell::RefCell::new(None) };
37}
38
39#[derive(Debug, Clone)]
44pub struct RuntimeConfig {
45 pub scheduler: SchedulerConfig,
47 pub driver_type: DriverType,
49 pub driver_io: crate::driver::DriverConfig,
51 pub enable_parking: bool,
53 pub park_timeout: Duration,
55}
56
57impl Default for RuntimeConfig {
58 fn default() -> Self {
59 Self {
60 scheduler: SchedulerConfig::default(),
61 driver_type: DriverType::Auto,
62 driver_io: crate::driver::DriverConfig::default(),
63 enable_parking: true,
64 park_timeout: Duration::from_millis(100),
65 }
66 }
67}
68
69pub struct RuntimeBuilder {
86 config: RuntimeConfig,
87}
88
89impl RuntimeBuilder {
90 pub fn new() -> Self {
93 Self {
94 config: RuntimeConfig::default(),
95 }
96 }
97
98 pub fn worker_threads(mut self, count: usize) -> Self {
101 self.config.scheduler.queue_size = count * 256;
102 self.config.scheduler.thread_name = "hiver-worker".to_string();
103 self
104 }
105
106 pub fn queue_size(mut self, size: usize) -> Self {
109 self.config.scheduler.queue_size = size;
110 self
111 }
112
113 pub fn thread_name(mut self, name: impl Into<String>) -> Self {
116 self.config.scheduler.thread_name = name.into();
117 self
118 }
119
120 pub fn driver_type(mut self, driver_type: DriverType) -> Self {
123 self.config.driver_type = driver_type;
124 self
125 }
126
127 pub fn io_entries(mut self, entries: u32) -> Self {
130 self.config.driver_io.entries = entries;
131 self
132 }
133
134 pub fn enable_parking(mut self, enable: bool) -> Self {
137 self.config.enable_parking = enable;
138 self
139 }
140
141 pub fn park_timeout(mut self, timeout: Duration) -> Self {
144 self.config.park_timeout = timeout;
145 self
146 }
147
148 pub fn build(self) -> io::Result<Runtime> {
156 Runtime::with_config(self.config)
157 }
158}
159
160impl Default for RuntimeBuilder {
161 fn default() -> Self {
162 Self::new()
163 }
164}
165
166pub struct Runtime {
185 scheduler: Scheduler,
187 driver: Arc<dyn Driver>,
189 config: RuntimeConfig,
191 main_waker: Option<Waker>,
193 last_timer_advance: Instant,
195}
196
197impl Runtime {
198 pub fn new() -> io::Result<Self> {
206 Self::with_config(RuntimeConfig::default())
207 }
208
209 pub fn builder() -> RuntimeBuilder {
212 RuntimeBuilder::new()
213 }
214
215 pub fn with_config(config: RuntimeConfig) -> io::Result<Self> {
225 let driver =
228 DriverFactory::create_with_config(config.driver_type, config.driver_io.clone())?;
229
230 let scheduler = Scheduler::with_config_and_driver(&config.scheduler, driver.clone())?;
233
234 Ok(Self {
235 scheduler,
236 driver,
237 config,
238 main_waker: None,
239 last_timer_advance: Instant::now(),
240 })
241 }
242
243 pub fn block_on<F: Future<Output = ()>>(&mut self, future: F) -> io::Result<()> {
265 let handle = Handle {
268 scheduler_handle: self.scheduler.handle(),
269 };
270 Handle::set_current(Some(handle));
271
272 let mut future = Box::pin(future);
275
276 let handle = self.scheduler.handle();
279 let waker = handle.waker();
280 let mut context = Context::from_waker(&waker);
281 self.main_waker = Some(waker.clone());
282
283 let result = loop {
286 match Pin::new(&mut future).poll(&mut context) {
289 Poll::Ready(()) => {
290 let _ = self.flush_events();
293 break Ok(());
294 },
295 Poll::Pending => {
296 self.run_once()?;
299 },
300 }
301 };
302
303 Handle::set_current(None);
306
307 result
308 }
309
310 fn run_once(&mut self) -> io::Result<()> {
313 let _ = self.driver.submit();
316
317 let timeout = if self.config.enable_parking {
320 Some(self.config.park_timeout)
321 } else {
322 None
323 };
324
325 if let Some(to) = timeout {
326 let (_events, timed_out) = self.driver.wait_timeout(to)?;
327 if timed_out {
328 }
331 } else {
332 let _events = self.driver.wait()?;
333 }
334
335 self.process_completions();
338
339 self.advance_timers();
342
343 Ok(())
344 }
345
346 fn process_completions(&mut self) {
349 while let Some(completion) = self.driver.get_completion() {
350 if let Some(waker) = self.scheduler.get_task_waker(completion.user_data) {
353 waker.wake();
354 }
355 self.driver.advance_completion();
356 }
357 }
358
359 fn advance_timers(&mut self) {
362 use crate::time::global_timer;
363
364 let now = Instant::now();
365 let elapsed = now.duration_since(self.last_timer_advance);
366
367 let ticks_to_advance = elapsed.as_millis() as u64;
370
371 if ticks_to_advance > 0 {
372 let _expired = global_timer().advance(ticks_to_advance);
373 self.last_timer_advance = now;
374 }
375 }
376
377 fn flush_events(&mut self) -> io::Result<()> {
380 let _ = self.driver.submit();
383
384 let _ = self.driver.wait_timeout(Duration::from_millis(0))?;
387
388 self.process_completions();
391
392 Ok(())
393 }
394}
395
396#[derive(Clone)]
402pub struct Handle {
403 scheduler_handle: SchedulerHandle,
405}
406
407impl Handle {
408 #[allow(clippy::expect_used)]
416 pub fn current() -> Self {
417 Self::try_current().expect("Handle::current() called outside of a runtime context")
418 }
419
420 pub fn try_current() -> Option<Self> {
423 CURRENT_HANDLE.with(|h| h.borrow().clone())
424 }
425
426 fn set_current(handle: Option<Handle>) {
429 CURRENT_HANDLE.with(|h| *h.borrow_mut() = handle);
430 }
431
432 pub fn scheduler(&self) -> &SchedulerHandle {
435 &self.scheduler_handle
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442
443 #[test]
444 fn test_runtime_config_default() {
445 let config = RuntimeConfig::default();
446 assert_eq!(config.scheduler.queue_size, 256);
447 assert!(config.enable_parking);
448 assert_eq!(config.park_timeout.as_millis(), 100);
449 }
450
451 #[test]
452 fn test_runtime_builder() {
453 let builder = RuntimeBuilder::new()
454 .worker_threads(4)
455 .queue_size(512)
456 .thread_name("test-worker")
457 .enable_parking(false);
458
459 assert_eq!(builder.config.scheduler.queue_size, 512);
460 assert_eq!(builder.config.scheduler.thread_name, "test-worker");
461 assert!(!builder.config.enable_parking);
462 }
463
464 #[test]
465 fn test_runtime_builder_driver_config() {
466 let builder = RuntimeBuilder::new()
467 .driver_type(DriverType::Auto)
468 .io_entries(512)
469 .park_timeout(Duration::from_millis(50));
470
471 assert_eq!(builder.config.driver_io.entries, 512);
472 assert_eq!(builder.config.park_timeout.as_millis(), 50);
473 }
474
475 #[test]
476 fn test_runtime_creation() {
477 let runtime = Runtime::new();
478 #[cfg(any(
479 target_os = "linux",
480 target_os = "macos",
481 target_os = "freebsd",
482 target_os = "netbsd",
483 target_os = "openbsd",
484 target_os = "dragonfly"
485 ))]
486 {
487 assert!(runtime.is_ok());
488 }
489 }
490
491 #[test]
492 fn test_block_on_simple() {
493 let mut runtime = Runtime::new().unwrap();
494 let result = runtime.block_on(async {});
495 assert!(result.is_ok());
496 }
497
498 #[test]
499 fn test_spawn_executes_through_scheduler() {
500 use std::sync::Arc;
501 use std::sync::atomic::{AtomicI32, Ordering};
502
503 let mut runtime = Runtime::new().unwrap();
504 let counter = Arc::new(AtomicI32::new(0));
505 let counter_clone = counter.clone();
506
507 runtime
508 .block_on(async move {
509 let handle = crate::task::spawn(async move {
510 counter_clone.store(42, Ordering::SeqCst);
511 });
512 let _ = handle.wait().await;
513 })
514 .unwrap();
515
516 assert_eq!(counter.load(Ordering::SeqCst), 42);
517 }
518
519 #[test]
520 fn test_spawn_returns_value() {
521 let mut runtime = Runtime::new().unwrap();
522
523 runtime
524 .block_on(async {
525 let handle = crate::task::spawn(async { 42i32 });
526 let result = handle.wait().await.unwrap();
527 assert_eq!(result, 42);
528 })
529 .unwrap();
530 }
531
532 #[test]
533 fn test_multiple_spawns() {
534 use std::sync::Arc;
535 use std::sync::atomic::{AtomicI32, Ordering};
536
537 let mut runtime = Runtime::new().unwrap();
538 let counter = Arc::new(AtomicI32::new(0));
539
540 runtime
541 .block_on(async {
542 let mut handles = vec![];
543 for _ in 0..10 {
544 let c = counter.clone();
545 handles.push(crate::task::spawn(async move {
546 c.fetch_add(1, Ordering::SeqCst);
547 }));
548 }
549 for h in handles {
550 let _ = h.wait().await;
551 }
552 })
553 .unwrap();
554
555 assert_eq!(counter.load(Ordering::SeqCst), 10);
556 }
557
558 #[test]
559 fn test_spawn_with_async_computation() {
560 let mut runtime = Runtime::new().unwrap();
561
562 runtime
563 .block_on(async {
564 let h1 = crate::task::spawn(async { 1i32 });
565 let h2 = crate::task::spawn(async { 2i32 });
566 let h3 = crate::task::spawn(async { 3i32 });
567
568 let sum =
569 h1.wait().await.unwrap() + h2.wait().await.unwrap() + h3.wait().await.unwrap();
570
571 assert_eq!(sum, 6);
572 })
573 .unwrap();
574 }
575
576 #[test]
577 fn test_spawn_join_handle_id() {
578 let mut runtime = Runtime::new().unwrap();
579
580 runtime
581 .block_on(async {
582 let h1 = crate::task::spawn(async { 1i32 });
583 let h2 = crate::task::spawn(async { 2i32 });
584 assert_ne!(h1.id(), 0);
585 assert_ne!(h2.id(), 0);
586 assert_ne!(h1.id(), h2.id());
587 let _ = h1.wait().await;
588 let _ = h2.wait().await;
589 })
590 .unwrap();
591 }
592
593 #[test]
594 fn test_spawn_join_handle_is_finished() {
595 let mut runtime = Runtime::new().unwrap();
596 use std::sync::Arc;
597 use std::sync::atomic::{AtomicBool, Ordering};
598
599 let flag = Arc::new(AtomicBool::new(false));
600 let flag_clone = flag.clone();
601
602 runtime
603 .block_on(async move {
604 let handle = crate::task::spawn(async move {
605 flag_clone.store(true, Ordering::SeqCst);
606 });
607 let _ = handle.wait().await;
608 assert!(flag.load(Ordering::SeqCst));
610 })
611 .unwrap();
612 }
613
614 #[test]
615 fn test_spawn_string_return() {
616 let mut runtime = Runtime::new().unwrap();
617
618 runtime
619 .block_on(async {
620 let handle = crate::task::spawn(async { String::from("hello") });
621 let result = handle.wait().await.unwrap();
622 assert_eq!(result, "hello");
623 })
624 .unwrap();
625 }
626
627 #[test]
628 fn test_spawn_vec_return() {
629 let mut runtime = Runtime::new().unwrap();
630
631 runtime
632 .block_on(async {
633 let handle = crate::task::spawn(async { vec![1, 2, 3] });
634 let result = handle.wait().await.unwrap();
635 assert_eq!(result, vec![1, 2, 3]);
636 })
637 .unwrap();
638 }
639
640 #[test]
641 fn test_spawn_tuple_return() {
642 let mut runtime = Runtime::new().unwrap();
643
644 runtime
645 .block_on(async {
646 let handle = crate::task::spawn(async { (42i32, true, "test".to_string()) });
647 let result = handle.wait().await.unwrap();
648 assert_eq!(result, (42, true, "test".to_string()));
649 })
650 .unwrap();
651 }
652
653 #[test]
654 fn test_spawn_unit_return() {
655 let mut runtime = Runtime::new().unwrap();
656
657 runtime
658 .block_on(async {
659 let handle: crate::task::JoinHandle<()> = crate::task::spawn(async {});
660 let result = handle.wait().await;
661 assert!(result.is_ok());
662 })
663 .unwrap();
664 }
665
666 #[test]
667 fn test_spawn_option_return() {
668 let mut runtime = Runtime::new().unwrap();
669
670 runtime
671 .block_on(async {
672 let handle = crate::task::spawn(async { Some(42i32) });
673 let result = handle.wait().await.unwrap();
674 assert_eq!(result, Some(42));
675 })
676 .unwrap();
677 }
678
679 #[test]
680 fn test_nested_spawn() {
681 let mut runtime = Runtime::new().unwrap();
682
683 runtime
684 .block_on(async {
685 let handle = crate::task::spawn(async {
686 let inner = crate::task::spawn(async { 10i32 });
687 inner.wait().await.unwrap()
688 });
689 let result = handle.wait().await.unwrap();
690 assert_eq!(result, 10);
691 })
692 .unwrap();
693 }
694
695 #[test]
696 fn test_handle_current_and_try_current() {
697 let mut runtime = Runtime::new().unwrap();
698
699 runtime
700 .block_on(async {
701 let handle = Handle::current();
703 assert!(Handle::try_current().is_some());
704
705 let _scheduler = handle.scheduler();
707 })
708 .unwrap();
709
710 assert!(Handle::try_current().is_none());
712 }
713
714 #[test]
715 #[should_panic(expected = "outside of a runtime context")]
716 fn test_handle_current_panics_outside_runtime() {
717 let _ = Handle::current();
718 }
719
720 #[test]
721 fn test_block_on_with_config() {
722 let config = RuntimeConfig {
723 park_timeout: Duration::from_millis(10),
724 ..RuntimeConfig::default()
725 };
726 let mut runtime = Runtime::with_config(config).unwrap();
727 let result = runtime.block_on(async {});
728 assert!(result.is_ok());
729 }
730}