Skip to main content

hiver_runtime/scheduler/
local.rs

1//! Local scheduler for thread-per-core runtime
2//! thread-per-core运行时的本地调度器
3
4use std::os::fd::RawFd;
5use std::sync::Arc;
6use std::sync::Mutex;
7use std::thread::{self, JoinHandle};
8use std::time::Duration;
9
10use super::{RawTask, handle::SchedulerHandle, queue::LocalQueue};
11
12/// Configuration for the scheduler
13/// 调度器配置
14#[derive(Debug, Clone)]
15pub struct SchedulerConfig {
16    /// Size of the local task queue / 本地任务队列大小
17    pub queue_size: usize,
18    /// CPU core affinity (None = no affinity) / CPU核心亲和性(None = 无亲和性)
19    pub cpu_affinity: Option<usize>,
20    /// Thread name prefix / 线程名称前缀
21    pub thread_name: String,
22}
23
24impl Default for SchedulerConfig {
25    fn default() -> Self {
26        Self {
27            queue_size: 256,
28            cpu_affinity: None,
29            thread_name: "hiver-worker".to_string(),
30        }
31    }
32}
33
34/// Local scheduler for a single thread
35/// 单线程的本地调度器
36///
37/// Each scheduler runs on its own thread and manages its own task queue.
38/// Each scheduler follows the thread-per-core model with no work stealing.
39///
40/// 每个调度器在自己的线程上运行并管理自己的任务队列。
41/// 每个调度器遵循 thread-per-core 模型,没有工作窃取。
42pub struct Scheduler {
43    /// Local task queue / 本地任务队列
44    queue: Arc<LocalQueue>,
45    /// External queue for task injection / 用于任务注入的外部队列
46    inject_queue: Arc<LocalQueue>,
47    /// Wake channel for external notifications / 外部通知的唤醒通道
48    wake: Arc<super::handle::WakeChannel>,
49    /// Scheduler state / 调度器状态
50    state: Arc<std::sync::atomic::AtomicU8>,
51    /// Join handle for the worker thread / 工作线程的join句柄
52    thread_handle: Option<JoinHandle<()>>,
53    /// Task waker storage (task_id -> waker) / 任务waker存储(task_id -> waker)
54    task_wakers: Arc<Mutex<std::collections::HashMap<u64, std::task::Waker>>>,
55}
56
57// Scheduler state values
58const STATE_RUNNING: u8 = 0;
59const STATE_SHUTTING_DOWN: u8 = 1;
60const STATE_STOPPED: u8 = 2;
61
62impl Scheduler {
63    /// Create a new scheduler with default configuration
64    /// 使用默认配置创建新调度器
65    ///
66    /// # Errors / 错误
67    ///
68    /// Returns an error if the wake channel cannot be created.
69    /// 如果无法创建唤醒通道则返回错误。
70    pub fn new() -> std::io::Result<Self> {
71        Self::with_config(&SchedulerConfig::default())
72    }
73
74    /// Create a new scheduler with the specified configuration
75    /// 使用指定配置创建新调度器
76    ///
77    /// # Errors / 错误
78    ///
79    /// Returns an error if:
80    /// 返回错误如果:
81    /// - Configuration is invalid / 配置无效
82    /// - Wake channel creation fails / 唤醒通道创建失败
83    pub fn with_config(config: &SchedulerConfig) -> std::io::Result<Self> {
84        let queue = Arc::new(LocalQueue::new(config.queue_size));
85        let inject_queue = Arc::new(LocalQueue::new(config.queue_size));
86        let wake = Arc::new(super::handle::WakeChannel::new()?);
87        let task_wakers = Arc::new(Mutex::new(std::collections::HashMap::new()));
88
89        let state = Arc::new(std::sync::atomic::AtomicU8::new(STATE_RUNNING));
90
91        // Clone for thread closure
92        // 为线程闭包克隆
93        let queue_clone = queue.clone();
94        let inject_queue_clone = inject_queue.clone();
95        let wake_clone = wake.clone();
96        let state_clone = state.clone();
97        let thread_name = config.thread_name.clone();
98        let cpu_affinity = config.cpu_affinity;
99
100        // Spawn the worker thread
101        // 生成工作线程
102        let thread_handle = thread::Builder::new().name(thread_name).spawn(move || {
103            // Set CPU affinity if specified
104            // 如果指定了,设置CPU亲和性
105            if let Some(core) = cpu_affinity {
106                Self::set_cpu_affinity(core);
107            }
108
109            // Run the scheduler loop
110            // 运行调度器循环
111            Self::run_scheduler(&queue_clone, &inject_queue_clone, &wake_clone, &state_clone);
112        })?;
113
114        Ok(Self {
115            queue,
116            inject_queue,
117            wake,
118            state,
119            thread_handle: Some(thread_handle),
120            task_wakers,
121        })
122    }
123
124    /// Create a new scheduler with the specified configuration and driver
125    /// 使用指定配置和driver创建新调度器
126    ///
127    /// # Errors / 错误
128    ///
129    /// Returns an error if:
130    /// 返回错误如果:
131    /// - Configuration is invalid / 配置无效
132    /// - Wake channel creation fails / 唤醒通道创建失败
133    pub fn with_config_and_driver(
134        config: &SchedulerConfig,
135        _driver: Arc<dyn crate::driver::Driver>,
136    ) -> std::io::Result<Self> {
137        let queue = Arc::new(LocalQueue::new(config.queue_size));
138        let inject_queue = Arc::new(LocalQueue::new(config.queue_size));
139        let wake = Arc::new(super::handle::WakeChannel::new()?);
140        let task_wakers = Arc::new(Mutex::new(std::collections::HashMap::new()));
141
142        let state = Arc::new(std::sync::atomic::AtomicU8::new(STATE_RUNNING));
143
144        // Clone for thread closure
145        // 为线程闭包克隆
146        let queue_clone = queue.clone();
147        let inject_queue_clone = inject_queue.clone();
148        let wake_clone = wake.clone();
149        let state_clone = state.clone();
150        let thread_name = config.thread_name.clone();
151        let cpu_affinity = config.cpu_affinity;
152
153        // Spawn the worker thread
154        // 生成工作线程
155        let thread_handle = thread::Builder::new().name(thread_name).spawn(move || {
156            // Set CPU affinity if specified
157            // 如果指定了,设置CPU亲和性
158            if let Some(core) = cpu_affinity {
159                Self::set_cpu_affinity(core);
160            }
161
162            // Run the scheduler loop with driver
163            // 运行带driver的调度器循环
164            // Driver is stored by Runtime and used in its block_on event loop.
165            // Scheduler worker handles task polling; Runtime handles I/O events
166            // and wakes tasks via waker → re-enqueue → wake channel notification.
167            // Driver由Runtime持有并在block_on事件循环中使用。
168            // Scheduler worker负责任务轮询;Runtime处理I/O事件,
169            // 通过waker → 重新入队 → wake通道通知来唤醒任务。
170            Self::run_scheduler(&queue_clone, &inject_queue_clone, &wake_clone, &state_clone);
171        })?;
172
173        Ok(Self {
174            queue,
175            inject_queue,
176            wake,
177            state,
178            thread_handle: Some(thread_handle),
179            task_wakers,
180        })
181    }
182
183    /// Get a handle to this scheduler for external task submission
184    /// 获取此调度器的句柄用于外部任务提交
185    #[must_use]
186    pub fn handle(&self) -> SchedulerHandle {
187        SchedulerHandle::new(self.inject_queue.clone(), self.wake.clone())
188    }
189
190    /// Request the scheduler to shut down gracefully
191    /// 请求调度器优雅关闭
192    pub fn shutdown(&self) {
193        self.state
194            .store(STATE_SHUTTING_DOWN, std::sync::atomic::Ordering::Release);
195        // Notify the scheduler to wake up and check state
196        // 通知调度器唤醒并检查状态
197        self.wake.notify();
198    }
199
200    /// Wait for the scheduler to stop
201    /// 等待调度器停止
202    ///
203    /// # Panics / 恐慌
204    ///
205    /// Panics if the scheduler thread has already been joined.
206    /// 如果调度器线程已被加入则恐慌。
207    pub fn join(&mut self) -> thread::Result<()> {
208        if let Some(handle) = self.thread_handle.take() {
209            handle.join()
210        } else {
211            Ok(())
212        }
213    }
214
215    /// Main scheduler loop
216    /// 主调度器循环
217    fn run_scheduler(
218        local_queue: &LocalQueue,
219        inject_queue: &LocalQueue,
220        wake: &super::handle::WakeChannel,
221        state: &std::sync::atomic::AtomicU8,
222    ) {
223        while state.load(std::sync::atomic::Ordering::Relaxed) == STATE_RUNNING {
224            // Try to get a task from local queue first
225            // 首先尝试从本地队列获取任务
226            let task = local_queue.pop().or_else(|| {
227                // Try inject queue (external submissions)
228                // 尝试注入队列(外部提交)
229                inject_queue.pop()
230            });
231
232            if let Some(task) = task {
233                // Execute the task by polling its future via the vtable
234                // 通过vtable轮询其future来执行任务
235                let completed = unsafe { crate::task::raw_task::poll_raw_task(task) };
236                if completed {
237                    // Task finished, consume queue ref
238                    unsafe {
239                        crate::task::raw_task::deallocate_completed_task(task);
240                    }
241                }
242                // If Pending: waker holds the ref and will re-enqueue when ready
243                // 如果Pending:waker持有引用,就绪时会重新入队
244            } else {
245                // No tasks available, block on wake channel with timeout
246                // 没有可用任务,带超时阻塞在唤醒通道上
247                wake.recv_timeout(Duration::from_millis(10));
248            }
249        }
250
251        state.store(STATE_STOPPED, std::sync::atomic::Ordering::Release);
252    }
253
254    /// Set CPU affinity for the current thread
255    /// 为当前线程设置CPU亲和性
256    #[cfg(target_os = "linux")]
257    fn set_cpu_affinity(core: usize) {
258        unsafe {
259            let mut cpu_set: libc::cpu_set_t = std::mem::zeroed();
260            libc::CPU_ZERO(&mut cpu_set);
261            libc::CPU_SET(core % libc::CPU_SETSIZE as usize, &mut cpu_set);
262
263            let _ = libc::sched_setaffinity(0, size_of::<libc::cpu_set_t>(), &cpu_set);
264        }
265    }
266
267    #[cfg(not(target_os = "linux"))]
268    fn set_cpu_affinity(_core: usize) {
269        // CPU affinity is only supported on Linux
270        // CPU亲和性仅在Linux上支持
271    }
272
273    /// Submit a task to this scheduler
274    /// 向此调度器提交任务
275    pub fn submit(&self, task: RawTask) -> Result<(), RawTask> {
276        if self.queue.push(task) {
277            self.wake.notify();
278            Ok(())
279        } else {
280            Err(task)
281        }
282    }
283
284    /// Get the wake file descriptor for epoll registration
285    /// 获取用于epoll注册的唤醒文件描述符
286    #[must_use]
287    pub fn wake_fd(&self) -> RawFd {
288        self.wake.raw_fd()
289    }
290
291    /// Get a task waker by ID
292    /// 通过ID获取任务waker
293    pub fn get_task_waker(&self, id: u64) -> Option<std::task::Waker> {
294        let wakers = self.task_wakers.lock().unwrap();
295        wakers.get(&id).cloned()
296    }
297
298    /// Register a task waker
299    /// 注册任务waker
300    pub fn register_task_waker(&self, id: u64, waker: std::task::Waker) {
301        let mut wakers = self.task_wakers.lock().unwrap();
302        wakers.insert(id, waker);
303    }
304
305    /// Remove a task waker
306    /// 移除任务waker
307    pub fn remove_task_waker(&self, id: u64) -> Option<std::task::Waker> {
308        let mut wakers = self.task_wakers.lock().unwrap();
309        wakers.remove(&id)
310    }
311}
312
313impl Drop for Scheduler {
314    fn drop(&mut self) {
315        // Ensure scheduler is stopped
316        // 确保调度器已停止
317        self.shutdown();
318        let _ = self.join();
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325
326    #[test]
327    fn test_scheduler_creation() {
328        let scheduler = Scheduler::new();
329        assert!(scheduler.is_ok());
330
331        let scheduler = scheduler.unwrap();
332        let handle = scheduler.handle();
333        assert!(handle.submit(0x1000 as RawTask).is_ok());
334    }
335
336    #[test]
337    fn test_scheduler_config() {
338        let config = SchedulerConfig {
339            queue_size: 512,
340            cpu_affinity: Some(0),
341            thread_name: "test-worker".to_string(),
342        };
343
344        let scheduler = Scheduler::with_config(&config);
345        assert!(scheduler.is_ok());
346    }
347
348    #[test]
349    fn test_scheduler_submit_and_handle() {
350        let scheduler = Scheduler::new().unwrap();
351        let handle = scheduler.handle();
352
353        // Submit multiple tasks
354        assert!(handle.submit(0x1000 as RawTask).is_ok());
355        assert!(handle.submit(0x2000 as RawTask).is_ok());
356
357        // Wake fd should be a valid file descriptor
358        assert!(handle.wake_fd() >= 0);
359    }
360
361    #[test]
362    fn test_scheduler_waker_store_empty() {
363        let scheduler = Scheduler::new().unwrap();
364
365        // Non-existent waker should return None
366        assert!(scheduler.get_task_waker(9999).is_none());
367
368        // Removing non-existent waker should return None
369        assert!(scheduler.remove_task_waker(9999).is_none());
370    }
371
372    #[test]
373    fn test_scheduler_shutdown() {
374        let scheduler = Scheduler::new().unwrap();
375        scheduler.shutdown();
376    }
377}