hiver_runtime/scheduler/
local.rs1use std::os::fd::RawFd;
5use std::sync::Arc;
6use std::sync::Mutex;
7use std::thread::{self, JoinHandle};
8use std::time::Duration;
9
10use super::{RawTask, handle::SchedulerHandle, queue::LocalQueue};
11
12#[derive(Debug, Clone)]
15pub struct SchedulerConfig {
16 pub queue_size: usize,
18 pub cpu_affinity: Option<usize>,
20 pub thread_name: String,
22}
23
24impl Default for SchedulerConfig {
25 fn default() -> Self {
26 Self {
27 queue_size: 256,
28 cpu_affinity: None,
29 thread_name: "hiver-worker".to_string(),
30 }
31 }
32}
33
34pub struct Scheduler {
43 queue: Arc<LocalQueue>,
45 inject_queue: Arc<LocalQueue>,
47 wake: Arc<super::handle::WakeChannel>,
49 state: Arc<std::sync::atomic::AtomicU8>,
51 thread_handle: Option<JoinHandle<()>>,
53 task_wakers: Arc<Mutex<std::collections::HashMap<u64, std::task::Waker>>>,
55}
56
57const STATE_RUNNING: u8 = 0;
59const STATE_SHUTTING_DOWN: u8 = 1;
60const STATE_STOPPED: u8 = 2;
61
62impl Scheduler {
63 pub fn new() -> std::io::Result<Self> {
71 Self::with_config(&SchedulerConfig::default())
72 }
73
74 pub fn with_config(config: &SchedulerConfig) -> std::io::Result<Self> {
84 let queue = Arc::new(LocalQueue::new(config.queue_size));
85 let inject_queue = Arc::new(LocalQueue::new(config.queue_size));
86 let wake = Arc::new(super::handle::WakeChannel::new()?);
87 let task_wakers = Arc::new(Mutex::new(std::collections::HashMap::new()));
88
89 let state = Arc::new(std::sync::atomic::AtomicU8::new(STATE_RUNNING));
90
91 let queue_clone = queue.clone();
94 let inject_queue_clone = inject_queue.clone();
95 let wake_clone = wake.clone();
96 let state_clone = state.clone();
97 let thread_name = config.thread_name.clone();
98 let cpu_affinity = config.cpu_affinity;
99
100 let thread_handle = thread::Builder::new().name(thread_name).spawn(move || {
103 if let Some(core) = cpu_affinity {
106 Self::set_cpu_affinity(core);
107 }
108
109 Self::run_scheduler(&queue_clone, &inject_queue_clone, &wake_clone, &state_clone);
112 })?;
113
114 Ok(Self {
115 queue,
116 inject_queue,
117 wake,
118 state,
119 thread_handle: Some(thread_handle),
120 task_wakers,
121 })
122 }
123
124 pub fn with_config_and_driver(
134 config: &SchedulerConfig,
135 _driver: Arc<dyn crate::driver::Driver>,
136 ) -> std::io::Result<Self> {
137 let queue = Arc::new(LocalQueue::new(config.queue_size));
138 let inject_queue = Arc::new(LocalQueue::new(config.queue_size));
139 let wake = Arc::new(super::handle::WakeChannel::new()?);
140 let task_wakers = Arc::new(Mutex::new(std::collections::HashMap::new()));
141
142 let state = Arc::new(std::sync::atomic::AtomicU8::new(STATE_RUNNING));
143
144 let queue_clone = queue.clone();
147 let inject_queue_clone = inject_queue.clone();
148 let wake_clone = wake.clone();
149 let state_clone = state.clone();
150 let thread_name = config.thread_name.clone();
151 let cpu_affinity = config.cpu_affinity;
152
153 let thread_handle = thread::Builder::new().name(thread_name).spawn(move || {
156 if let Some(core) = cpu_affinity {
159 Self::set_cpu_affinity(core);
160 }
161
162 Self::run_scheduler(&queue_clone, &inject_queue_clone, &wake_clone, &state_clone);
171 })?;
172
173 Ok(Self {
174 queue,
175 inject_queue,
176 wake,
177 state,
178 thread_handle: Some(thread_handle),
179 task_wakers,
180 })
181 }
182
183 #[must_use]
186 pub fn handle(&self) -> SchedulerHandle {
187 SchedulerHandle::new(self.inject_queue.clone(), self.wake.clone())
188 }
189
190 pub fn shutdown(&self) {
193 self.state
194 .store(STATE_SHUTTING_DOWN, std::sync::atomic::Ordering::Release);
195 self.wake.notify();
198 }
199
200 pub fn join(&mut self) -> thread::Result<()> {
208 if let Some(handle) = self.thread_handle.take() {
209 handle.join()
210 } else {
211 Ok(())
212 }
213 }
214
215 fn run_scheduler(
218 local_queue: &LocalQueue,
219 inject_queue: &LocalQueue,
220 wake: &super::handle::WakeChannel,
221 state: &std::sync::atomic::AtomicU8,
222 ) {
223 while state.load(std::sync::atomic::Ordering::Relaxed) == STATE_RUNNING {
224 let task = local_queue.pop().or_else(|| {
227 inject_queue.pop()
230 });
231
232 if let Some(task) = task {
233 let completed = unsafe { crate::task::raw_task::poll_raw_task(task) };
236 if completed {
237 unsafe {
239 crate::task::raw_task::deallocate_completed_task(task);
240 }
241 }
242 } else {
245 wake.recv_timeout(Duration::from_millis(10));
248 }
249 }
250
251 state.store(STATE_STOPPED, std::sync::atomic::Ordering::Release);
252 }
253
254 #[cfg(target_os = "linux")]
257 fn set_cpu_affinity(core: usize) {
258 unsafe {
259 let mut cpu_set: libc::cpu_set_t = std::mem::zeroed();
260 libc::CPU_ZERO(&mut cpu_set);
261 libc::CPU_SET(core % libc::CPU_SETSIZE as usize, &mut cpu_set);
262
263 let _ = libc::sched_setaffinity(0, size_of::<libc::cpu_set_t>(), &cpu_set);
264 }
265 }
266
267 #[cfg(not(target_os = "linux"))]
268 fn set_cpu_affinity(_core: usize) {
269 }
272
273 pub fn submit(&self, task: RawTask) -> Result<(), RawTask> {
276 if self.queue.push(task) {
277 self.wake.notify();
278 Ok(())
279 } else {
280 Err(task)
281 }
282 }
283
284 #[must_use]
287 pub fn wake_fd(&self) -> RawFd {
288 self.wake.raw_fd()
289 }
290
291 pub fn get_task_waker(&self, id: u64) -> Option<std::task::Waker> {
294 let wakers = self.task_wakers.lock().unwrap();
295 wakers.get(&id).cloned()
296 }
297
298 pub fn register_task_waker(&self, id: u64, waker: std::task::Waker) {
301 let mut wakers = self.task_wakers.lock().unwrap();
302 wakers.insert(id, waker);
303 }
304
305 pub fn remove_task_waker(&self, id: u64) -> Option<std::task::Waker> {
308 let mut wakers = self.task_wakers.lock().unwrap();
309 wakers.remove(&id)
310 }
311}
312
313impl Drop for Scheduler {
314 fn drop(&mut self) {
315 self.shutdown();
318 let _ = self.join();
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325
326 #[test]
327 fn test_scheduler_creation() {
328 let scheduler = Scheduler::new();
329 assert!(scheduler.is_ok());
330
331 let scheduler = scheduler.unwrap();
332 let handle = scheduler.handle();
333 assert!(handle.submit(0x1000 as RawTask).is_ok());
334 }
335
336 #[test]
337 fn test_scheduler_config() {
338 let config = SchedulerConfig {
339 queue_size: 512,
340 cpu_affinity: Some(0),
341 thread_name: "test-worker".to_string(),
342 };
343
344 let scheduler = Scheduler::with_config(&config);
345 assert!(scheduler.is_ok());
346 }
347
348 #[test]
349 fn test_scheduler_submit_and_handle() {
350 let scheduler = Scheduler::new().unwrap();
351 let handle = scheduler.handle();
352
353 assert!(handle.submit(0x1000 as RawTask).is_ok());
355 assert!(handle.submit(0x2000 as RawTask).is_ok());
356
357 assert!(handle.wake_fd() >= 0);
359 }
360
361 #[test]
362 fn test_scheduler_waker_store_empty() {
363 let scheduler = Scheduler::new().unwrap();
364
365 assert!(scheduler.get_task_waker(9999).is_none());
367
368 assert!(scheduler.remove_task_waker(9999).is_none());
370 }
371
372 #[test]
373 fn test_scheduler_shutdown() {
374 let scheduler = Scheduler::new().unwrap();
375 scheduler.shutdown();
376 }
377}