uefi_async/no_alloc/
executor.rs

1// use crate::bss::lifo::{Queue, Stealer, Worker};
2// use crate::bss::task::{SafeFuture, TaskHeader, TaskPool, TaskSlot, SLOT_EMPTY, SLOT_OCCUPIED};
3// use crate::bss::waker::WakePolicy;
4// use core::pin::Pin;
5// use core::ptr::write;
6// use core::sync::atomic::Ordering;
7// use core::task::{Context, Poll, Waker};
8//
9// pub struct Executor<const N: usize> {
10//     /// 本地核心的 Worker
11//     pub worker: Worker<N>,
12//     /// 其他核心的 Stealers
13//     pub stealers: &'static [Stealer<N>],
14// }
15//
16// impl<const N: usize> Executor<N> {
17//     pub const fn new(worker: Worker<N>, stealers: &'static [Stealer<N>]) -> Self {
18//         Self { worker, stealers }
19//     }
20//
21//     pub fn run_step(&self, waker: &Waker) -> bool {
22//         // 1. & 2. 获取任务指针 (优先本地 LIFO,其次跨核窃取 FIFO)
23//         let task_ptr = self.worker.pop().or_else(|| {
24//             self.stealers.iter().find_map(|s| {
25//                 // steal_and_pop 直接返回一个可用的任务,同时将其余任务转移到本地
26//                 s.steal_and_pop(&self.worker, |n| (n + 1) / 2).ok().map(|(ptr, _)| ptr)
27//             })
28//         });
29//
30//         // 3. 执行任务
31//         if let Some(ptr) = task_ptr {
32//             unsafe {
33//                 // 此时 ptr 指向 TaskSlot<F> 的开头,即 TaskHeader 的开头
34//                 let header = &*(ptr as *const TaskHeader);
35//
36//                 // 直接调用 poll_handle
37//                 // 内部逻辑(WakePolicy 判定、类型还原、Poll 执行)全部封装在 poll_wrapper 中
38//                 let _ = (header.poll_handle)(ptr, waker);
39//             }
40//             return true;
41//         }
42//         false
43//     }
44//
45//     pub fn spawn<F: SafeFuture>(&self, f: F) {
46//     }
47// }
48//
49// impl<F: SafeFuture> TaskSlot<F> {
50//     /// 翻译被抹掉的类型
51//     pub fn poll_wrapper(ptr: *mut (), waker: &Waker) -> Poll<()> {
52//         unsafe {
53//             let slot = &*(ptr as *const Self);
54//             let futrue = slot.future.uninit();
55//
56//             // 1. 原子获取并清除唤醒位
57//             let prev_val = slot.header.control.fetch_and(!WakePolicy::WAKE_BIT, Ordering::Acquire);
58//
59//             // 2. 使用 WakePolicy 内部逻辑解包并判定
60//             let (policy, is_waked) = WakePolicy::unpack(prev_val);
61//
62//             if !policy.should_poll(is_waked) {
63//                 // InterruptOnly 且未被唤醒的情况
64//                 return Poll::Pending;
65//             }
66//
67//             // 3. 执行真正的 Future 推进
68//             let future_mut = &mut *(*slot.future.get()).as_mut_ptr();
69//             let res = Pin::new_unchecked(&mut *future_mut).poll(&mut Context::from_waker(waker));
70//
71//             if res.is_ready() {
72//                 core::ptr::drop_in_place(future_mut);
73//                 slot.header.occupied.store(SLOT_EMPTY, Ordering::Release);
74//             }
75//
76//             res
77//         }
78//     }
79//
80// }
81//
82// pub const MAX_CORES: usize = 4;
83// pub const QUEUE_SIZE: usize = 256;
84//
85// // 1. 全局原始队列池
86// pub static GLOBAL_QUEUES: [Queue<QUEUE_SIZE>; MAX_CORES] = [
87//     Queue::new(), Queue::new(), Queue::new(), Queue::new()
88// ];
89//
90// // 2. 全局 Stealer 矩阵 (排除自己)
91// static STEALER_POOL: [[Stealer<QUEUE_SIZE>; MAX_CORES - 1]; MAX_CORES] = {
92//     let mut pool = [[Stealer { queue: &GLOBAL_QUEUES[0] }; MAX_CORES - 1]; MAX_CORES];
93//     let mut i = 0;
94//     while i < MAX_CORES {
95//         let mut j = 0;
96//         let mut target = 0;
97//         while target < MAX_CORES {
98//             if i != target {
99//                 pool[i][j] = Stealer { queue: &GLOBAL_QUEUES[target] };
100//                 j += 1;
101//             }
102//             target += 1;
103//         }
104//         i += 1;
105//     }
106//     pool
107// };
108//
109// /// 每个 CPU 核心启动时调用的初始化函数
110// pub fn init_executor(core_id: usize) -> Executor<QUEUE_SIZE> {
111//     let worker = Worker::new(&GLOBAL_QUEUES[core_id]);
112//     let stealers = &STEALER_POOL[core_id];
113//     Executor::new(worker, stealers)
114// }
115//
116// /// 将任务指针重新调度到任意可用的全局队列中
117// /// 由 Waker 调用
118// pub fn schedule_task(ptr: *mut ()) {
119//     // 简单策略:遍历所有核心的队列,尝试推入
120//     // 实际生产中可能优先推入当前核心或随机选择
121//     for q in GLOBAL_QUEUES.iter() {
122//         let worker = Worker::new(q);
123//         if worker.push(ptr).is_ok() {
124//             return;
125//         }
126//     }
127//     // 如果所有队列都满了,这是一个严重问题(系统过载)
128//     // 在这个简易实现中,我们只能丢弃这次唤醒(可能会导致任务饿死),或者在这里自旋等待
129// }
130//
131// impl<const N: usize> Worker<N> {
132//     /// 从指定的 TaskPool 中分配槽位并推入队列
133//     pub fn spawn_task<F, const POOL_N: usize>(
134//         &self,
135//         pool: &'static TaskPool<F, POOL_N>,
136//         fut: F
137//     ) -> Result<(), F>
138//     where F: Future<Output = ()> + 'static + Send + Sync
139//     {
140//         // 1. 寻找空闲槽位
141//         for slot in pool.0.iter() {
142//             // 尝试锁定槽位
143//             if slot.header.occupied.compare_exchange(
144//                 SLOT_EMPTY, SLOT_OCCUPIED, Ordering::Acquire, Ordering::Relaxed
145//             ).is_ok() {
146//                 unsafe {
147//                     // 2. 初始化 Future 内容
148//                     let future_ptr = (*slot.future.get()).as_mut_ptr();
149//                     write(future_ptr, fut);
150//
151//                     // 3. 将槽位地址推入 LIFO 队列
152//                     // 如果队列满了,我们需要退还槽位所有权
153//                     let ptr = slot as *const TaskSlot<F> as *mut ();
154//                     if let Err(_) = self.push(ptr) {
155//                         let recovered_fut = core::ptr::read(future_ptr); // 拿回所有权
156//                         slot.header.occupied.store(SLOT_EMPTY, Ordering::Release);
157//                         return Err(recovered_fut);
158//                     }
159//                 }
160//                 return Ok(());
161//             }
162//         }
163//         Err(fut)
164//     }
165// }
166
167use core::hint::spin_loop;
168use core::sync::atomic::Ordering;
169use crate::no_alloc::lifo::{Queue, Stealer, Worker};
170use crate::no_alloc::task::{State, TaskHeader};
171
172pub const CORE_SIZE: usize = 4;
173pub const QUEUE_SIZE: usize = 256;
174
175pub static GLOBAL_QUEUES: [Queue<QUEUE_SIZE>; CORE_SIZE] = [
176    Queue::new(), Queue::new(), Queue::new(), Queue::new()
177];
178
179pub static STEALER_POOL: [Stealer<QUEUE_SIZE>; CORE_SIZE] = {
180    let mut stealers = [Stealer(&GLOBAL_QUEUES[0]); CORE_SIZE];
181    let mut i = 0;
182    while i < CORE_SIZE {
183        stealers[i] = Stealer(&GLOBAL_QUEUES[i]);
184        i += 1;
185    }
186    stealers
187};
188
189pub struct Executor {
190    pub worker: Worker<QUEUE_SIZE>,
191    pub core_id: usize,
192}
193
194impl Executor {
195    pub fn new(core_id: usize) -> Self {
196        assert!(core_id < CORE_SIZE, "The pool is not enough for core allocation");
197        Executor { worker: Worker::new(&GLOBAL_QUEUES[core_id]), core_id }
198    }
199
200    #[inline(always)]
201    pub fn add(&self, ptr: *mut TaskHeader) {
202        let head = unsafe { &*ptr };
203        if head.state.compare_exchange(
204            State::Initialized.into(), State::Ready.into(), Ordering::Acquire, Ordering::Relaxed
205        ).is_err() { todo!("Failed to change task state") }
206        if let Err(h) = self.worker.push(ptr) {
207            // TODO: User Custom Error Handler
208            panic!("Failed to push task to local queue: {:?}", h);
209        }
210    }
211
212    pub fn run(&self) {
213        loop {
214            // 1. 本地 pop
215            if let Some(task) = self.worker.pop() {
216                self.run_task(task);
217                continue
218            }
219
220            // 2. 遍历全局 STEALER_POOL 窃取
221            if let Some(task) = self.try_steal() {
222                self.run_task(task);
223                continue
224            }
225
226            // 防止过度争抢总线
227            spin_loop();
228        }
229    }
230
231    fn try_steal(&self) -> Option<*mut TaskHeader> {
232        for (id, stealer) in STEALER_POOL.iter().enumerate() {
233            if id == self.core_id { continue }
234
235            // 尝试从其他核心偷取一个直接执行
236            if let Ok((task, _)) = stealer.steal_and_pop(&self.worker, |_| 1) {
237                return Some(task);
238            }
239        }
240        None
241    }
242
243    fn run_task(&self, ptr: *mut TaskHeader) {
244        let head = unsafe { &*ptr };
245
246        if head.state.compare_exchange(
247            State::Ready.into(), State::Running.into(), Ordering::Acquire, Ordering::Relaxed
248        ).is_err() { todo!("Failed to change task state") }
249
250        // SAFETY: TaskHeader size equaled.
251        let is_ready = unsafe { (head.poll)(ptr) };
252        if is_ready {
253            if head.state.compare_exchange(
254                State::Running.into(), State::Unreachable.into(), Ordering::Acquire, Ordering::Relaxed
255            ).is_err() { todo!("Failed to change task state") }
256        }
257    }
258}