uefi_async/no_alloc/executor.rs
1// use crate::bss::lifo::{Queue, Stealer, Worker};
2// use crate::bss::task::{SafeFuture, TaskHeader, TaskPool, TaskSlot, SLOT_EMPTY, SLOT_OCCUPIED};
3// use crate::bss::waker::WakePolicy;
4// use core::pin::Pin;
5// use core::ptr::write;
6// use core::sync::atomic::Ordering;
7// use core::task::{Context, Poll, Waker};
8//
9// pub struct Executor<const N: usize> {
10// /// 本地核心的 Worker
11// pub worker: Worker<N>,
12// /// 其他核心的 Stealers
13// pub stealers: &'static [Stealer<N>],
14// }
15//
16// impl<const N: usize> Executor<N> {
17// pub const fn new(worker: Worker<N>, stealers: &'static [Stealer<N>]) -> Self {
18// Self { worker, stealers }
19// }
20//
21// pub fn run_step(&self, waker: &Waker) -> bool {
22// // 1. & 2. 获取任务指针 (优先本地 LIFO,其次跨核窃取 FIFO)
23// let task_ptr = self.worker.pop().or_else(|| {
24// self.stealers.iter().find_map(|s| {
25// // steal_and_pop 直接返回一个可用的任务,同时将其余任务转移到本地
26// s.steal_and_pop(&self.worker, |n| (n + 1) / 2).ok().map(|(ptr, _)| ptr)
27// })
28// });
29//
30// // 3. 执行任务
31// if let Some(ptr) = task_ptr {
32// unsafe {
33// // 此时 ptr 指向 TaskSlot<F> 的开头,即 TaskHeader 的开头
34// let header = &*(ptr as *const TaskHeader);
35//
36// // 直接调用 poll_handle
37// // 内部逻辑(WakePolicy 判定、类型还原、Poll 执行)全部封装在 poll_wrapper 中
38// let _ = (header.poll_handle)(ptr, waker);
39// }
40// return true;
41// }
42// false
43// }
44//
45// pub fn spawn<F: SafeFuture>(&self, f: F) {
46// }
47// }
48//
49// impl<F: SafeFuture> TaskSlot<F> {
50// /// 翻译被抹掉的类型
51// pub fn poll_wrapper(ptr: *mut (), waker: &Waker) -> Poll<()> {
52// unsafe {
53// let slot = &*(ptr as *const Self);
54// let futrue = slot.future.uninit();
55//
56// // 1. 原子获取并清除唤醒位
57// let prev_val = slot.header.control.fetch_and(!WakePolicy::WAKE_BIT, Ordering::Acquire);
58//
59// // 2. 使用 WakePolicy 内部逻辑解包并判定
60// let (policy, is_waked) = WakePolicy::unpack(prev_val);
61//
62// if !policy.should_poll(is_waked) {
63// // InterruptOnly 且未被唤醒的情况
64// return Poll::Pending;
65// }
66//
67// // 3. 执行真正的 Future 推进
68// let future_mut = &mut *(*slot.future.get()).as_mut_ptr();
69// let res = Pin::new_unchecked(&mut *future_mut).poll(&mut Context::from_waker(waker));
70//
71// if res.is_ready() {
72// core::ptr::drop_in_place(future_mut);
73// slot.header.occupied.store(SLOT_EMPTY, Ordering::Release);
74// }
75//
76// res
77// }
78// }
79//
80// }
81//
82// pub const MAX_CORES: usize = 4;
83// pub const QUEUE_SIZE: usize = 256;
84//
85// // 1. 全局原始队列池
86// pub static GLOBAL_QUEUES: [Queue<QUEUE_SIZE>; MAX_CORES] = [
87// Queue::new(), Queue::new(), Queue::new(), Queue::new()
88// ];
89//
90// // 2. 全局 Stealer 矩阵 (排除自己)
91// static STEALER_POOL: [[Stealer<QUEUE_SIZE>; MAX_CORES - 1]; MAX_CORES] = {
92// let mut pool = [[Stealer { queue: &GLOBAL_QUEUES[0] }; MAX_CORES - 1]; MAX_CORES];
93// let mut i = 0;
94// while i < MAX_CORES {
95// let mut j = 0;
96// let mut target = 0;
97// while target < MAX_CORES {
98// if i != target {
99// pool[i][j] = Stealer { queue: &GLOBAL_QUEUES[target] };
100// j += 1;
101// }
102// target += 1;
103// }
104// i += 1;
105// }
106// pool
107// };
108//
109// /// 每个 CPU 核心启动时调用的初始化函数
110// pub fn init_executor(core_id: usize) -> Executor<QUEUE_SIZE> {
111// let worker = Worker::new(&GLOBAL_QUEUES[core_id]);
112// let stealers = &STEALER_POOL[core_id];
113// Executor::new(worker, stealers)
114// }
115//
116// /// 将任务指针重新调度到任意可用的全局队列中
117// /// 由 Waker 调用
118// pub fn schedule_task(ptr: *mut ()) {
119// // 简单策略:遍历所有核心的队列,尝试推入
120// // 实际生产中可能优先推入当前核心或随机选择
121// for q in GLOBAL_QUEUES.iter() {
122// let worker = Worker::new(q);
123// if worker.push(ptr).is_ok() {
124// return;
125// }
126// }
127// // 如果所有队列都满了,这是一个严重问题(系统过载)
128// // 在这个简易实现中,我们只能丢弃这次唤醒(可能会导致任务饿死),或者在这里自旋等待
129// }
130//
131// impl<const N: usize> Worker<N> {
132// /// 从指定的 TaskPool 中分配槽位并推入队列
133// pub fn spawn_task<F, const POOL_N: usize>(
134// &self,
135// pool: &'static TaskPool<F, POOL_N>,
136// fut: F
137// ) -> Result<(), F>
138// where F: Future<Output = ()> + 'static + Send + Sync
139// {
140// // 1. 寻找空闲槽位
141// for slot in pool.0.iter() {
142// // 尝试锁定槽位
143// if slot.header.occupied.compare_exchange(
144// SLOT_EMPTY, SLOT_OCCUPIED, Ordering::Acquire, Ordering::Relaxed
145// ).is_ok() {
146// unsafe {
147// // 2. 初始化 Future 内容
148// let future_ptr = (*slot.future.get()).as_mut_ptr();
149// write(future_ptr, fut);
150//
151// // 3. 将槽位地址推入 LIFO 队列
152// // 如果队列满了,我们需要退还槽位所有权
153// let ptr = slot as *const TaskSlot<F> as *mut ();
154// if let Err(_) = self.push(ptr) {
155// let recovered_fut = core::ptr::read(future_ptr); // 拿回所有权
156// slot.header.occupied.store(SLOT_EMPTY, Ordering::Release);
157// return Err(recovered_fut);
158// }
159// }
160// return Ok(());
161// }
162// }
163// Err(fut)
164// }
165// }
166
167use core::hint::spin_loop;
168use core::sync::atomic::Ordering;
169use crate::no_alloc::lifo::{Queue, Stealer, Worker};
170use crate::no_alloc::task::{State, TaskHeader};
171
172pub const CORE_SIZE: usize = 4;
173pub const QUEUE_SIZE: usize = 256;
174
175pub static GLOBAL_QUEUES: [Queue<QUEUE_SIZE>; CORE_SIZE] = [
176 Queue::new(), Queue::new(), Queue::new(), Queue::new()
177];
178
179pub static STEALER_POOL: [Stealer<QUEUE_SIZE>; CORE_SIZE] = {
180 let mut stealers = [Stealer(&GLOBAL_QUEUES[0]); CORE_SIZE];
181 let mut i = 0;
182 while i < CORE_SIZE {
183 stealers[i] = Stealer(&GLOBAL_QUEUES[i]);
184 i += 1;
185 }
186 stealers
187};
188
189pub struct Executor {
190 pub worker: Worker<QUEUE_SIZE>,
191 pub core_id: usize,
192}
193
194impl Executor {
195 pub fn new(core_id: usize) -> Self {
196 assert!(core_id < CORE_SIZE, "The pool is not enough for core allocation");
197 Executor { worker: Worker::new(&GLOBAL_QUEUES[core_id]), core_id }
198 }
199
200 #[inline(always)]
201 pub fn add(&self, ptr: *mut TaskHeader) {
202 let head = unsafe { &*ptr };
203 if head.state.compare_exchange(
204 State::Initialized.into(), State::Ready.into(), Ordering::Acquire, Ordering::Relaxed
205 ).is_err() { todo!("Failed to change task state") }
206 if let Err(h) = self.worker.push(ptr) {
207 // TODO: User Custom Error Handler
208 panic!("Failed to push task to local queue: {:?}", h);
209 }
210 }
211
212 pub fn run(&self) {
213 loop {
214 // 1. 本地 pop
215 if let Some(task) = self.worker.pop() {
216 self.run_task(task);
217 continue
218 }
219
220 // 2. 遍历全局 STEALER_POOL 窃取
221 if let Some(task) = self.try_steal() {
222 self.run_task(task);
223 continue
224 }
225
226 // 防止过度争抢总线
227 spin_loop();
228 }
229 }
230
231 fn try_steal(&self) -> Option<*mut TaskHeader> {
232 for (id, stealer) in STEALER_POOL.iter().enumerate() {
233 if id == self.core_id { continue }
234
235 // 尝试从其他核心偷取一个直接执行
236 if let Ok((task, _)) = stealer.steal_and_pop(&self.worker, |_| 1) {
237 return Some(task);
238 }
239 }
240 None
241 }
242
243 fn run_task(&self, ptr: *mut TaskHeader) {
244 let head = unsafe { &*ptr };
245
246 if head.state.compare_exchange(
247 State::Ready.into(), State::Running.into(), Ordering::Acquire, Ordering::Relaxed
248 ).is_err() { todo!("Failed to change task state") }
249
250 // SAFETY: TaskHeader size equaled.
251 let is_ready = unsafe { (head.poll)(ptr) };
252 if is_ready {
253 if head.state.compare_exchange(
254 State::Running.into(), State::Unreachable.into(), Ordering::Acquire, Ordering::Relaxed
255 ).is_err() { todo!("Failed to change task state") }
256 }
257 }
258}