Skip to main content

sparreal_kernel/os/async/
executor.rs

1//! 单CPU异步执行器
2//!
3//! 基于embassy设计的单CPU异步执行器,支持任务优先级调度。
4//! 特性:
5//! - Wake任务优先执行
6//! - 超过1秒未执行的任务获得优先级提升
7//! - 使用alloc::进行动态内存分配
8//! - 使用IrqSpinlock保证中断安全
9
10use core::future::Future;
11use core::time::Duration;
12
13use alloc::boxed::Box;
14use alloc::collections::BTreeMap;
15use alloc::collections::BinaryHeap;
16use alloc::collections::VecDeque;
17use alloc::sync::Arc;
18
19use crate::os::sync::spinlock::IrqSpinlock;
20
21use super::task::{TaskHandle, TaskId, TaskMetadata, TaskPriority, TaskRef};
22
23/// 全局任务唤醒队列
24static GLOBAL_WAKEUP_QUEUE: IrqSpinlock<VecDeque<TaskId>> = IrqSpinlock::new(VecDeque::new());
25
26/// 将任务ID添加到全局唤醒队列
27pub fn enqueue_task_wakeup(task_id: TaskId) {
28    let mut queue = GLOBAL_WAKEUP_QUEUE.lock();
29    if !queue.contains(&task_id) {
30        queue.push_back(task_id);
31    }
32}
33
34/// 单CPU异步执行器
35pub struct SingleCpuExecutor {
36    /// 任务优先级队列
37    priority_task_queue: IrqSpinlock<BinaryHeap<PriorityTaskWrapper>>,
38    /// 任务注册表(ID -> TaskRef)
39    active_task_registry: IrqSpinlock<alloc::collections::BTreeMap<TaskId, Arc<TaskRef>>>,
40    /// 执行器运行状态
41    executor_running: IrqSpinlock<bool>,
42    /// 任务超时阈值(毫秒)
43    task_timeout_milliseconds: u64,
44}
45
46/// 优先级任务包装器,用于优先级队列
47#[derive(Debug)]
48struct PriorityTaskWrapper {
49    /// 任务优先级信息
50    task_priority: TaskPriority,
51    /// 任务引用
52    task_reference: Arc<TaskRef>,
53}
54
55impl PriorityTaskWrapper {
56    /// 创建新的优先级任务包装器
57    fn new(task_ref: Arc<TaskRef>) -> Self {
58        Self {
59            task_priority: task_ref.priority(),
60            task_reference: task_ref,
61        }
62    }
63}
64
65// 实现排序:优先级高的在前,时间戳小的在前,ID小的在前
66impl PartialEq for PriorityTaskWrapper {
67    fn eq(&self, other: &Self) -> bool {
68        self.task_priority == other.task_priority
69    }
70}
71
72impl Eq for PriorityTaskWrapper {}
73
74impl PartialOrd for PriorityTaskWrapper {
75    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
76        Some(self.cmp(other))
77    }
78}
79
80impl Ord for PriorityTaskWrapper {
81    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
82        // 注意:BinaryHeap是最大堆,所以需要反向比较
83        other.task_priority.cmp(&self.task_priority)
84    }
85}
86
87impl SingleCpuExecutor {
88    /// 创建新的执行器实例
89    pub fn new() -> Self {
90        Self::with_timeout(Duration::from_secs(1))
91    }
92
93    /// 使用自定义超时时间创建执行器
94    pub fn with_timeout(timeout: Duration) -> Self {
95        Self {
96            priority_task_queue: IrqSpinlock::new(BinaryHeap::new()),
97            active_task_registry: IrqSpinlock::new(BTreeMap::new()),
98            executor_running: IrqSpinlock::new(false),
99            task_timeout_milliseconds: timeout.as_millis() as u64,
100        }
101    }
102
103    /// 获取全局执行器实例
104    pub fn global() -> &'static Self {
105        use core::sync::atomic::{AtomicPtr, Ordering};
106
107        static EXECUTOR_PTR: AtomicPtr<SingleCpuExecutor> = AtomicPtr::new(core::ptr::null_mut());
108
109        let ptr = EXECUTOR_PTR.load(Ordering::Acquire);
110        if ptr.is_null() {
111            // 创建新的执行器实例
112            let executor = Box::leak(Box::new(SingleCpuExecutor::new()));
113            match EXECUTOR_PTR.compare_exchange(
114                core::ptr::null_mut(),
115                executor,
116                Ordering::AcqRel,
117                Ordering::Acquire,
118            ) {
119                Ok(_) => executor,
120                Err(existing) => {
121                    // 其他线程已经创建了执行器,使用现有的
122                    unsafe {
123                        // 需要处理内存泄漏问题,这里简化处理
124                        let _ = Box::from_raw(executor);
125                    }
126                    unsafe { &*existing }
127                }
128            }
129        } else {
130            unsafe { &*ptr }
131        }
132    }
133
134    /// 生成异步任务
135    pub fn spawn<F, T>(&self, future: F) -> TaskHandle
136    where
137        F: Future<Output = T> + Send + 'static,
138        T: Send + 'static,
139    {
140        let task_id = TaskId::new();
141        let metadata = TaskMetadata::new(task_id);
142
143        // 转换Future为返回()的Future
144        let wrapped_future = async move {
145            let _ = future.await;
146        };
147
148        let task_ref = Arc::new(TaskRef::new(wrapped_future, metadata));
149        let handle = TaskHandle::with_ref(task_id, task_ref.clone());
150
151        // 注册任务
152        {
153            let mut registry = self.active_task_registry.lock();
154            registry.insert(task_id, task_ref);
155        }
156
157        // 添加到任务队列
158        self.add_task_to_queue(task_id);
159
160        debug!("Spawned task {task_id:?}",);
161        handle
162    }
163
164    /// 添加任务到调度队列
165    fn add_task_to_queue(&self, task_id: TaskId) {
166        let registry = self.active_task_registry.lock();
167        if let Some(task_ref) = registry.get(&task_id) {
168            let priority_task = PriorityTaskWrapper::new(task_ref.clone());
169            let mut queue = self.priority_task_queue.lock();
170            queue.push(priority_task);
171        }
172    }
173
174    /// 唤醒指定任务
175    pub fn wake_by_id(&self, task_id: TaskId) -> bool {
176        let registry = self.active_task_registry.lock();
177        if let Some(task_ref) = registry.get(&task_id) {
178            task_ref.metadata.lock().mark_woken();
179            self.add_task_to_queue(task_id);
180            true
181        } else {
182            false
183        }
184    }
185
186    /// 处理单个任务
187    fn process_one_task(&self) -> bool {
188        let priority_task = {
189            let mut queue = self.priority_task_queue.lock();
190            queue.pop()
191        };
192
193        if let Some(priority_task) = priority_task {
194            let task_ref = priority_task.task_reference.clone();
195            let task_id = task_ref.id();
196
197            // 检查任务是否已完成
198            if task_ref.is_completed() {
199                // 清理已完成的任务
200                let mut registry = self.active_task_registry.lock();
201                registry.remove(&task_id);
202                debug!("Task {task_id:?} completed and cleaned up");
203                return true;
204            }
205
206            // 检查任务状态和是否需要执行
207            let should_execute = {
208                let mut metadata = task_ref.metadata.lock();
209
210                // 唤醒状态的任务直接执行
211                if metadata.state == super::task::TaskState::Woken {
212                    true
213                } else if metadata.is_expired(self.task_timeout_milliseconds) {
214                    // 超时的任务提升优先级并执行
215                    metadata.mark_woken();
216                    debug!("Task {task_id:?} expired, promoting priority");
217                    true
218                } else {
219                    // Pending 状态且未超时的任务,不执行
220                    false
221                }
222            };
223
224            if !should_execute {
225                // 不执行的任务,不重新排队(等待被 wake 唤醒)
226                // 直接返回 true 表示处理了一个任务(跳过它)
227                return true;
228            }
229
230            // 创建Waker并轮询任务
231            let waker = ExecutorWaker::new(task_id);
232            let waker = waker.into_waker();
233
234            match task_ref.poll(&waker) {
235                core::task::Poll::Ready(()) => {
236                    // 任务已完成,清理注册表
237                    let mut registry = self.active_task_registry.lock();
238                    registry.remove(&task_id);
239                    debug!("Task {task_id:?} completed");
240                    true
241                }
242                core::task::Poll::Pending => {
243                    // 任务返回Pending,不需要重新排队
244                    // 等待Waker被调用后再加入队列
245                    debug!("Task {task_id:?} pending, waiting for wake");
246                    true
247                }
248            }
249        } else {
250            false // 没有任务可处理
251        }
252    }
253
254    /// 运行一次任务调度
255    pub fn tick(&self) {
256        // 首先处理唤醒队列中的任务
257        self.process_wake_queue();
258
259        // 处理多个任务,直到队列为空或达到合理限制
260        let mut processed = 0;
261        const MAX_TASKS_PER_TICK: usize = 10;
262
263        while processed < MAX_TASKS_PER_TICK && self.process_one_task() {
264            processed += 1;
265        }
266
267        if processed == 0 {
268            log::debug!("No tasks to process in this tick");
269        }
270    }
271
272    /// 处理唤醒队列
273    fn process_wake_queue(&self) {
274        loop {
275            let task_id = {
276                let mut queue = GLOBAL_WAKEUP_QUEUE.lock();
277                queue.pop_front()
278            };
279
280            if let Some(task_id) = task_id {
281                // 标记任务为唤醒状态并加入调度队列
282                let registry = self.active_task_registry.lock();
283                if let Some(task_ref) = registry.get(&task_id) {
284                    task_ref.metadata.lock().mark_woken();
285                    let priority_task = PriorityTaskWrapper::new(task_ref.clone());
286                    let mut queue = self.priority_task_queue.lock();
287                    queue.push(priority_task);
288                }
289            } else {
290                break;
291            }
292        }
293    }
294
295    /// 运行直到所有任务完成
296    pub fn run_until_completion(&self) {
297        *self.executor_running.lock() = true;
298
299        debug!("Executor started, running until completion");
300
301        while self.has_pending_tasks() {
302            self.tick();
303
304            // 简单的CPU让步,避免过度占用
305            for _ in 0..1000 {
306                core::hint::spin_loop();
307            }
308        }
309
310        *self.executor_running.lock() = false;
311        debug!("Executor finished, all tasks completed");
312    }
313
314    /// 检查是否有待处理的任务
315    pub fn has_pending_tasks(&self) -> bool {
316        // 检查唤醒队列
317        if !GLOBAL_WAKEUP_QUEUE.lock().is_empty() {
318            return true;
319        }
320        // 检查任务队列
321        if !self.priority_task_queue.lock().is_empty() {
322            return true;
323        }
324        // 检查是否有未完成的任务(可能在等待被唤醒)
325        let registry = self.active_task_registry.lock();
326        !registry.is_empty()
327    }
328
329    /// 获取当前任务数量
330    pub fn task_count(&self) -> usize {
331        self.active_task_registry.lock().len()
332    }
333
334    /// 获取队列中的任务数量
335    pub fn queued_task_count(&self) -> usize {
336        self.priority_task_queue.lock().len()
337    }
338
339    /// 检查执行器是否正在运行
340    pub fn is_running(&self) -> bool {
341        *self.executor_running.lock()
342    }
343}
344
345impl Default for SingleCpuExecutor {
346    fn default() -> Self {
347        Self::new()
348    }
349}
350
351/// 生成异步任务的便捷函数
352pub fn spawn<F, T>(future: F) -> TaskHandle
353where
354    F: Future<Output = T> + Send + 'static,
355    T: Send + 'static,
356{
357    SingleCpuExecutor::global().spawn(future)
358}
359
360/// 阻塞等待异步任务完成
361/// 注意:当前简化实现,仅支持()返回类型
362pub fn block_on<F>(future: F)
363where
364    F: Future<Output = ()> + Send + 'static,
365{
366    let executor = SingleCpuExecutor::global();
367    executor.spawn(future);
368    executor.run_until_completion();
369}
370
371/// 执行一次任务调度
372pub fn tick() {
373    SingleCpuExecutor::global().tick();
374}
375
376/// 检查是否有待处理的任务
377pub fn has_pending_tasks() -> bool {
378    SingleCpuExecutor::global().has_pending_tasks()
379}
380
381/// 获取当前任务数量
382pub fn task_count() -> usize {
383    SingleCpuExecutor::global().task_count()
384}
385
386/// Executor专用的Waker实现
387#[derive(Debug)]
388struct ExecutorWaker {
389    /// 任务ID
390    task_id: TaskId,
391}
392
393impl ExecutorWaker {
394    /// 创建新的执行器Waker
395    fn new(task_id: TaskId) -> Self {
396        Self { task_id }
397    }
398
399    /// 转换为标准库Waker
400    fn into_waker(self) -> core::task::Waker {
401        let arc = Arc::new(self);
402        core::task::Waker::from(arc)
403    }
404}
405
406impl alloc::task::Wake for ExecutorWaker {
407    fn wake(self: Arc<Self>) {
408        enqueue_task_wakeup(self.task_id);
409    }
410
411    fn wake_by_ref(self: &Arc<Self>) {
412        enqueue_task_wakeup(self.task_id);
413    }
414}