Skip to main content

hiver_runtime/
runtime.rs

1//! Runtime module
2//! 运行时模块
3//!
4//! # Overview / 概述
5//!
6//! This module provides the main runtime implementation that brings together
7//! the scheduler, driver, and timer components.
8//!
9//! 本模块提供了主要的运行时实现,将调度器、驱动和定时器组件组合在一起。
10//!
11//! # Example / 示例
12//!
13//! ```rust,no_run,ignore
14//! use hiver_runtime::Runtime;
15//!
16//! fn main() -> std::io::Result<()> {
17//!     let runtime = Runtime::new()?;
18//!     runtime.block_on(async {
19//!         println!("Hello from Hiver!");
20//!     });
21//!     Ok(())
22//! }
23//! ```
24
25use std::future::Future;
26use std::io;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::task::{Context, Poll, Waker};
30
31use crate::driver::{Driver, DriverFactory, DriverType};
32use crate::scheduler::{Scheduler, SchedulerConfig, SchedulerHandle};
33use crate::time::{Duration, Instant};
34
35thread_local! {
36    static CURRENT_HANDLE: std::cell::RefCell<Option<Handle>> = const { std::cell::RefCell::new(None) };
37}
38
39/// Runtime configuration / 运行时配置
40///
41/// Configuration for the async runtime including scheduler and driver settings.
42/// 异步运行时的配置,包括调度器和驱动设置。
43#[derive(Debug, Clone)]
44pub struct RuntimeConfig {
45    /// Scheduler configuration / 调度器配置
46    pub scheduler: SchedulerConfig,
47    /// Driver type to use / 要使用的driver类型
48    pub driver_type: DriverType,
49    /// Driver I/O configuration / Driver I/O配置
50    pub driver_io: crate::driver::DriverConfig,
51    /// Enable thread parking / 启用线程休眠
52    pub enable_parking: bool,
53    /// Park timeout / 休眠超时
54    pub park_timeout: Duration,
55}
56
57impl Default for RuntimeConfig {
58    fn default() -> Self {
59        Self {
60            scheduler: SchedulerConfig::default(),
61            driver_type: DriverType::Auto,
62            driver_io: crate::driver::DriverConfig::default(),
63            enable_parking: true,
64            park_timeout: Duration::from_millis(100),
65        }
66    }
67}
68
69/// Runtime builder / 运行时构建器
70///
71/// Provides a fluent API for configuring and building a runtime.
72/// 提供用于配置和构建运行时的流畅API。
73///
74/// # Example / 示例
75///
76/// ```rust,no_run,ignore
77/// use hiver_runtime::Runtime;
78///
79/// let runtime = Runtime::builder()
80///     .worker_threads(4)
81///     .queue_size(512)
82///     .build()
83///     .unwrap();
84/// ```
85pub struct RuntimeBuilder {
86    config: RuntimeConfig,
87}
88
89impl RuntimeBuilder {
90    /// Create a new runtime builder with default configuration
91    /// 使用默认配置创建新的运行时构建器
92    pub fn new() -> Self {
93        Self {
94            config: RuntimeConfig::default(),
95        }
96    }
97
98    /// Set the number of worker threads
99    /// 设置工作线程数量
100    pub fn worker_threads(mut self, count: usize) -> Self {
101        self.config.scheduler.queue_size = count * 256;
102        self.config.scheduler.thread_name = "hiver-worker".to_string();
103        self
104    }
105
106    /// Set the queue size for the scheduler
107    /// 设置调度器的队列大小
108    pub fn queue_size(mut self, size: usize) -> Self {
109        self.config.scheduler.queue_size = size;
110        self
111    }
112
113    /// Set the thread name pattern
114    /// 设置线程名称模式
115    pub fn thread_name(mut self, name: impl Into<String>) -> Self {
116        self.config.scheduler.thread_name = name.into();
117        self
118    }
119
120    /// Set the driver type
121    /// 设置driver类型
122    pub fn driver_type(mut self, driver_type: DriverType) -> Self {
123        self.config.driver_type = driver_type;
124        self
125    }
126
127    /// Set the I/O driver queue depth
128    /// 设置I/O驱动队列深度
129    pub fn io_entries(mut self, entries: u32) -> Self {
130        self.config.driver_io.entries = entries;
131        self
132    }
133
134    /// Enable or disable thread parking
135    /// 启用或禁用线程休眠
136    pub fn enable_parking(mut self, enable: bool) -> Self {
137        self.config.enable_parking = enable;
138        self
139    }
140
141    /// Set the park timeout
142    /// 设置休眠超时
143    pub fn park_timeout(mut self, timeout: Duration) -> Self {
144        self.config.park_timeout = timeout;
145        self
146    }
147
148    /// Build the runtime
149    /// 构建运行时
150    ///
151    /// # Errors / 错误
152    ///
153    /// Returns an error if runtime initialization fails.
154    /// 如果运行时初始化失败则返回错误。
155    pub fn build(self) -> io::Result<Runtime> {
156        Runtime::with_config(self.config)
157    }
158}
159
160impl Default for RuntimeBuilder {
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166/// The async runtime / 异步运行时
167///
168/// Main entry point for the async runtime. Manages scheduler, driver, and timers.
169/// 异步运行时的主入口点。管理调度器、驱动和定时器。
170///
171/// # Example / 示例
172///
173/// ```rust,no_run,ignore
174/// use hiver_runtime::Runtime;
175///
176/// fn main() -> std::io::Result<()> {
177///     let runtime = Runtime::new()?;
178///     runtime.block_on(async {
179///         println!("Hello, world!");
180///     });
181///     Ok(())
182/// }
183/// ```
184pub struct Runtime {
185    /// The scheduler / 调度器
186    scheduler: Scheduler,
187    /// The driver / 驱动
188    driver: Arc<dyn Driver>,
189    /// Runtime configuration / 运行时配置
190    config: RuntimeConfig,
191    /// Waker for the main task / 主任务的waker
192    main_waker: Option<Waker>,
193    /// Last time the timer was advanced / 上次推进定时器的时间
194    last_timer_advance: Instant,
195}
196
197impl Runtime {
198    /// Create a new runtime with default configuration
199    /// 使用默认配置创建新的运行时
200    ///
201    /// # Errors / 错误
202    ///
203    /// Returns an error if runtime initialization fails.
204    /// 如果运行时初始化失败则返回错误。
205    pub fn new() -> io::Result<Self> {
206        Self::with_config(RuntimeConfig::default())
207    }
208
209    /// Create a runtime builder
210    /// 创建运行时构建器
211    pub fn builder() -> RuntimeBuilder {
212        RuntimeBuilder::new()
213    }
214
215    /// Create a new runtime with the specified configuration
216    /// 使用指定配置创建新的运行时
217    ///
218    /// # Errors / 错误
219    ///
220    /// Returns an error if:
221    /// 返回错误如果:
222    /// - Driver creation fails / Driver创建失败
223    /// - Scheduler creation fails / 调度器创建失败
224    pub fn with_config(config: RuntimeConfig) -> io::Result<Self> {
225        // Create the driver
226        // 创建driver
227        let driver =
228            DriverFactory::create_with_config(config.driver_type, config.driver_io.clone())?;
229
230        // Create the scheduler with the driver
231        // 使用driver创建调度器
232        let scheduler = Scheduler::with_config_and_driver(&config.scheduler, driver.clone())?;
233
234        Ok(Self {
235            scheduler,
236            driver,
237            config,
238            main_waker: None,
239            last_timer_advance: Instant::now(),
240        })
241    }
242
243    /// Run a future to completion on this runtime
244    /// 在此运行时上运行future到完成
245    ///
246    /// This is the main entry point for executing async code.
247    /// 这是执行异步代码的主入口点。
248    ///
249    /// # Errors / 错误
250    ///
251    /// Returns an error if the future returns an error.
252    /// 如果future返回错误则返回错误。
253    ///
254    /// # Example / 示例
255    ///
256    /// ```rust,no_run,ignore
257    /// use hiver_runtime::Runtime;
258    ///
259    /// let runtime = Runtime::new().unwrap();
260    /// runtime.block_on(async {
261    ///     println!("Hello, world!");
262    /// });
263    /// ```
264    pub fn block_on<F: Future<Output = ()>>(&mut self, future: F) -> io::Result<()> {
265        // Set the current runtime handle for this thread
266        // 为当前线程设置运行时句柄
267        let handle = Handle {
268            scheduler_handle: self.scheduler.handle(),
269        };
270        Handle::set_current(Some(handle));
271
272        // Pin the future
273        // Pin future
274        let mut future = Box::pin(future);
275
276        // Create a waker for the main task
277        // 为主任务创建waker
278        let handle = self.scheduler.handle();
279        let waker = handle.waker();
280        let mut context = Context::from_waker(&waker);
281        self.main_waker = Some(waker.clone());
282
283        // Run the event loop
284        // 运行事件循环
285        let result = loop {
286            // Poll the future
287            // 轮询future
288            match Pin::new(&mut future).poll(&mut context) {
289                Poll::Ready(()) => {
290                    // Future completed, flush any remaining events
291                    // Future完成,刷新任何剩余事件
292                    let _ = self.flush_events();
293                    break Ok(());
294                },
295                Poll::Pending => {
296                    // Future is not ready, run the event loop
297                    // Future未就绪,运行事件循环
298                    self.run_once()?;
299                },
300            }
301        };
302
303        // Clear the thread-local handle
304        // 清除线程本地句柄
305        Handle::set_current(None);
306
307        result
308    }
309
310    /// Run a single iteration of the event loop
311    /// 运行事件循环的单次迭代
312    fn run_once(&mut self) -> io::Result<()> {
313        // Submit any pending I/O operations
314        // 提交任何挂起的I/O操作
315        let _ = self.driver.submit();
316
317        // Wait for events with timeout
318        // 带超时等待事件
319        let timeout = if self.config.enable_parking {
320            Some(self.config.park_timeout)
321        } else {
322            None
323        };
324
325        if let Some(to) = timeout {
326            let (_events, timed_out) = self.driver.wait_timeout(to)?;
327            if timed_out {
328                // Timeout occurred, this is normal for idle periods
329                // 超时发生,这对空闲期是正常的
330            }
331        } else {
332            let _events = self.driver.wait()?;
333        }
334
335        // Process completions
336        // 处理完成事件
337        self.process_completions();
338
339        // Advance the timer wheel
340        // 推进时间轮
341        self.advance_timers();
342
343        Ok(())
344    }
345
346    /// Process completion events from the driver
347    /// 处理来自driver的完成事件
348    fn process_completions(&mut self) {
349        while let Some(completion) = self.driver.get_completion() {
350            // Notify the task associated with this completion
351            // 通知与此完成关联的任务
352            if let Some(waker) = self.scheduler.get_task_waker(completion.user_data) {
353                waker.wake();
354            }
355            self.driver.advance_completion();
356        }
357    }
358
359    /// Advance the timer wheel and wake expired timers
360    /// 推进时间轮并唤醒到期的定时器
361    fn advance_timers(&mut self) {
362        use crate::time::global_timer;
363
364        let now = Instant::now();
365        let elapsed = now.duration_since(self.last_timer_advance);
366
367        // Convert elapsed time to ticks (1ms per tick)
368        // 将经过时间转换为滴答数(每毫秒1个滴答)
369        let ticks_to_advance = elapsed.as_millis() as u64;
370
371        if ticks_to_advance > 0 {
372            let _expired = global_timer().advance(ticks_to_advance);
373            self.last_timer_advance = now;
374        }
375    }
376
377    /// Flush any remaining events in the driver
378    /// 刷新driver中的任何剩余事件
379    fn flush_events(&mut self) -> io::Result<()> {
380        // Submit pending operations
381        // 提交挂起的操作
382        let _ = self.driver.submit();
383
384        // Process any remaining completions without blocking
385        // 不阻塞地处理任何剩余的完成事件
386        let _ = self.driver.wait_timeout(Duration::from_millis(0))?;
387
388        // Process completions
389        // 处理完成事件
390        self.process_completions();
391
392        Ok(())
393    }
394}
395
396/// Spawning handle for the runtime
397/// 运行时的生成句柄
398///
399/// Provides access to runtime functionality from within tasks.
400/// 从任务内部提供运行时功能访问。
401#[derive(Clone)]
402pub struct Handle {
403    /// The scheduler handle / 调度器句柄
404    scheduler_handle: SchedulerHandle,
405}
406
407impl Handle {
408    /// Get a handle to the current runtime
409    /// 获取当前运行时的句柄
410    ///
411    /// # Panics / 恐慌
412    ///
413    /// Panics if called outside of a runtime context.
414    /// 如果在运行时上下文之外调用则恐慌。
415    #[allow(clippy::expect_used)]
416    pub fn current() -> Self {
417        Self::try_current().expect("Handle::current() called outside of a runtime context")
418    }
419
420    /// Try to get a handle to the current runtime. Returns None if outside a runtime.
421    /// 尝试获取当前运行时的句柄。如果在运行时外部则返回None。
422    pub fn try_current() -> Option<Self> {
423        CURRENT_HANDLE.with(|h| h.borrow().clone())
424    }
425
426    /// Set the current runtime handle for this thread
427    /// 为当前线程设置运行时句柄
428    fn set_current(handle: Option<Handle>) {
429        CURRENT_HANDLE.with(|h| *h.borrow_mut() = handle);
430    }
431
432    /// Get the scheduler handle
433    /// 获取调度器句柄
434    pub fn scheduler(&self) -> &SchedulerHandle {
435        &self.scheduler_handle
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    #[test]
444    fn test_runtime_config_default() {
445        let config = RuntimeConfig::default();
446        assert_eq!(config.scheduler.queue_size, 256);
447        assert!(config.enable_parking);
448        assert_eq!(config.park_timeout.as_millis(), 100);
449    }
450
451    #[test]
452    fn test_runtime_builder() {
453        let builder = RuntimeBuilder::new()
454            .worker_threads(4)
455            .queue_size(512)
456            .thread_name("test-worker")
457            .enable_parking(false);
458
459        assert_eq!(builder.config.scheduler.queue_size, 512);
460        assert_eq!(builder.config.scheduler.thread_name, "test-worker");
461        assert!(!builder.config.enable_parking);
462    }
463
464    #[test]
465    fn test_runtime_builder_driver_config() {
466        let builder = RuntimeBuilder::new()
467            .driver_type(DriverType::Auto)
468            .io_entries(512)
469            .park_timeout(Duration::from_millis(50));
470
471        assert_eq!(builder.config.driver_io.entries, 512);
472        assert_eq!(builder.config.park_timeout.as_millis(), 50);
473    }
474
475    #[test]
476    fn test_runtime_creation() {
477        let runtime = Runtime::new();
478        #[cfg(any(
479            target_os = "linux",
480            target_os = "macos",
481            target_os = "freebsd",
482            target_os = "netbsd",
483            target_os = "openbsd",
484            target_os = "dragonfly"
485        ))]
486        {
487            assert!(runtime.is_ok());
488        }
489    }
490
491    #[test]
492    fn test_block_on_simple() {
493        let mut runtime = Runtime::new().unwrap();
494        let result = runtime.block_on(async {});
495        assert!(result.is_ok());
496    }
497
498    #[test]
499    fn test_spawn_executes_through_scheduler() {
500        use std::sync::Arc;
501        use std::sync::atomic::{AtomicI32, Ordering};
502
503        let mut runtime = Runtime::new().unwrap();
504        let counter = Arc::new(AtomicI32::new(0));
505        let counter_clone = counter.clone();
506
507        runtime
508            .block_on(async move {
509                let handle = crate::task::spawn(async move {
510                    counter_clone.store(42, Ordering::SeqCst);
511                });
512                let _ = handle.wait().await;
513            })
514            .unwrap();
515
516        assert_eq!(counter.load(Ordering::SeqCst), 42);
517    }
518
519    #[test]
520    fn test_spawn_returns_value() {
521        let mut runtime = Runtime::new().unwrap();
522
523        runtime
524            .block_on(async {
525                let handle = crate::task::spawn(async { 42i32 });
526                let result = handle.wait().await.unwrap();
527                assert_eq!(result, 42);
528            })
529            .unwrap();
530    }
531
532    #[test]
533    fn test_multiple_spawns() {
534        use std::sync::Arc;
535        use std::sync::atomic::{AtomicI32, Ordering};
536
537        let mut runtime = Runtime::new().unwrap();
538        let counter = Arc::new(AtomicI32::new(0));
539
540        runtime
541            .block_on(async {
542                let mut handles = vec![];
543                for _ in 0..10 {
544                    let c = counter.clone();
545                    handles.push(crate::task::spawn(async move {
546                        c.fetch_add(1, Ordering::SeqCst);
547                    }));
548                }
549                for h in handles {
550                    let _ = h.wait().await;
551                }
552            })
553            .unwrap();
554
555        assert_eq!(counter.load(Ordering::SeqCst), 10);
556    }
557
558    #[test]
559    fn test_spawn_with_async_computation() {
560        let mut runtime = Runtime::new().unwrap();
561
562        runtime
563            .block_on(async {
564                let h1 = crate::task::spawn(async { 1i32 });
565                let h2 = crate::task::spawn(async { 2i32 });
566                let h3 = crate::task::spawn(async { 3i32 });
567
568                let sum =
569                    h1.wait().await.unwrap() + h2.wait().await.unwrap() + h3.wait().await.unwrap();
570
571                assert_eq!(sum, 6);
572            })
573            .unwrap();
574    }
575
576    #[test]
577    fn test_spawn_join_handle_id() {
578        let mut runtime = Runtime::new().unwrap();
579
580        runtime
581            .block_on(async {
582                let h1 = crate::task::spawn(async { 1i32 });
583                let h2 = crate::task::spawn(async { 2i32 });
584                assert_ne!(h1.id(), 0);
585                assert_ne!(h2.id(), 0);
586                assert_ne!(h1.id(), h2.id());
587                let _ = h1.wait().await;
588                let _ = h2.wait().await;
589            })
590            .unwrap();
591    }
592
593    #[test]
594    fn test_spawn_join_handle_is_finished() {
595        let mut runtime = Runtime::new().unwrap();
596        use std::sync::Arc;
597        use std::sync::atomic::{AtomicBool, Ordering};
598
599        let flag = Arc::new(AtomicBool::new(false));
600        let flag_clone = flag.clone();
601
602        runtime
603            .block_on(async move {
604                let handle = crate::task::spawn(async move {
605                    flag_clone.store(true, Ordering::SeqCst);
606                });
607                let _ = handle.wait().await;
608                // After wait completes, the task must be finished
609                assert!(flag.load(Ordering::SeqCst));
610            })
611            .unwrap();
612    }
613
614    #[test]
615    fn test_spawn_string_return() {
616        let mut runtime = Runtime::new().unwrap();
617
618        runtime
619            .block_on(async {
620                let handle = crate::task::spawn(async { String::from("hello") });
621                let result = handle.wait().await.unwrap();
622                assert_eq!(result, "hello");
623            })
624            .unwrap();
625    }
626
627    #[test]
628    fn test_spawn_vec_return() {
629        let mut runtime = Runtime::new().unwrap();
630
631        runtime
632            .block_on(async {
633                let handle = crate::task::spawn(async { vec![1, 2, 3] });
634                let result = handle.wait().await.unwrap();
635                assert_eq!(result, vec![1, 2, 3]);
636            })
637            .unwrap();
638    }
639
640    #[test]
641    fn test_spawn_tuple_return() {
642        let mut runtime = Runtime::new().unwrap();
643
644        runtime
645            .block_on(async {
646                let handle = crate::task::spawn(async { (42i32, true, "test".to_string()) });
647                let result = handle.wait().await.unwrap();
648                assert_eq!(result, (42, true, "test".to_string()));
649            })
650            .unwrap();
651    }
652
653    #[test]
654    fn test_spawn_unit_return() {
655        let mut runtime = Runtime::new().unwrap();
656
657        runtime
658            .block_on(async {
659                let handle: crate::task::JoinHandle<()> = crate::task::spawn(async {});
660                let result = handle.wait().await;
661                assert!(result.is_ok());
662            })
663            .unwrap();
664    }
665
666    #[test]
667    fn test_spawn_option_return() {
668        let mut runtime = Runtime::new().unwrap();
669
670        runtime
671            .block_on(async {
672                let handle = crate::task::spawn(async { Some(42i32) });
673                let result = handle.wait().await.unwrap();
674                assert_eq!(result, Some(42));
675            })
676            .unwrap();
677    }
678
679    #[test]
680    fn test_nested_spawn() {
681        let mut runtime = Runtime::new().unwrap();
682
683        runtime
684            .block_on(async {
685                let handle = crate::task::spawn(async {
686                    let inner = crate::task::spawn(async { 10i32 });
687                    inner.wait().await.unwrap()
688                });
689                let result = handle.wait().await.unwrap();
690                assert_eq!(result, 10);
691            })
692            .unwrap();
693    }
694
695    #[test]
696    fn test_handle_current_and_try_current() {
697        let mut runtime = Runtime::new().unwrap();
698
699        runtime
700            .block_on(async {
701                // Inside runtime context, both should succeed
702                let handle = Handle::current();
703                assert!(Handle::try_current().is_some());
704
705                // Verify scheduler handle is functional
706                let _scheduler = handle.scheduler();
707            })
708            .unwrap();
709
710        // Outside runtime context
711        assert!(Handle::try_current().is_none());
712    }
713
714    #[test]
715    #[should_panic(expected = "outside of a runtime context")]
716    fn test_handle_current_panics_outside_runtime() {
717        let _ = Handle::current();
718    }
719
720    #[test]
721    fn test_block_on_with_config() {
722        let config = RuntimeConfig {
723            park_timeout: Duration::from_millis(10),
724            ..RuntimeConfig::default()
725        };
726        let mut runtime = Runtime::with_config(config).unwrap();
727        let result = runtime.block_on(async {});
728        assert!(result.is_ok());
729    }
730}