onion_vm/lambda/scheduler/
async_scheduler.rs

1//! 异步任务调度器(AsyncScheduler)实现:用于管理和调度一组可运行对象(Runnable)。
2//! 
3//! 该模块实现了基于优先级的协作式多任务调度,支持任务的动态优先级调整和新任务的生成。
4//! AsyncScheduler 作为 Onion 虚拟机的顶层调度单元,负责按优先级推进任务队列、处理返回值、错误和新任务的生成。
5use arc_gc::gc::GC;
6use std::{collections::VecDeque, sync::Arc};
7
8use crate::{
9    lambda::runnable::{Runnable, RuntimeError, StepResult},
10    types::{
11        async_handle::OnionAsyncHandle,
12        object::{GCArcStorage, OnionObjectCell, OnionObjectExt},
13    },
14    unwrap_step_result,
15};
16
17/// 优先级级别数量减一,用于限制最大优先级
18const NUM_PRIORITY_LEVELS: usize = 3;
19
20/// 生成调度步长,基于 2^(n+1)-1 序列
21///
22/// 这个函数为给定的优先级生成调度间隔。优先级越高(数值越大),
23/// 调度间隔越长,任务被执行的频率越低。
24///
25/// # 参数
26/// * `n` - 优先级级别(0表示最高优先级)
27///
28/// # 返回值
29/// 返回该优先级对应的调度间隔步数
30///
31/// # 示例
32/// - 优先级0: 2^1-1 = 1 (每步都执行)
33/// - 优先级1: 2^2-1 = 3 (每3步执行一次)
34/// - 优先级2: 2^3-1 = 7 (每7步执行一次)
35#[inline(always)]
36fn generate_sched_step(n: usize) -> u64 {
37    (1u64 << (n + 1)) - 1
38}
39
40/// 表示调度器中的一个任务
41///
42/// 每个任务包含可执行的代码、任务处理器和优先级信息。
43/// 优先级决定了任务被调度的频率,数值越大表示优先级越低。
44pub struct Task {
45    /// 可执行的任务代码
46    runnable: Box<dyn Runnable>,
47    /// 异步任务处理器,用于设置任务结果和管理任务状态
48    task_handler: (Arc<OnionAsyncHandle>, GCArcStorage),
49    /// 任务优先级,0表示最高优先级,数值越大优先级越低
50    priority: usize,
51}
52
53impl Task {
54    /// 创建一个新的任务
55    ///
56    /// # 参数
57    /// * `runnable` - 实现了 Runnable trait 的可执行对象
58    /// * `task_handler` - 异步任务处理器,包含 AsyncHandle 和 GC 存储
59    /// * `priority` - 任务的初始优先级,0表示最高优先级
60    ///
61    /// # 返回值
62    /// 返回新创建的 Task 实例
63    pub fn new(
64        runnable: Box<dyn Runnable>,
65        task_handler: (Arc<OnionAsyncHandle>, GCArcStorage),
66        priority: usize,
67    ) -> Self {
68        Self {
69            runnable,
70            task_handler,
71            priority,
72        }
73    }
74}
75
76/// 异步任务调度器
77///
78/// 实现了基于优先级的协作式多任务调度。任务按照优先级和调度间隔被执行,
79/// 优先级越高的任务执行频率越高。调度器使用轮询方式处理任务队列,
80/// 支持任务的动态优先级调整和新任务的生成。
81///
82/// # 调度策略
83/// - 使用单队列存储所有任务
84/// - 基于 2^(n+1)-1 序列确定调度间隔
85/// - 当任务pending时自动降低优先级
86/// - 支持任务完成时的结果设置
87///
88/// # 优先级系统
89/// - 0: 最高优先级,每步都执行
90/// - 1: 中等优先级,每3步执行一次
91/// - 2: 低优先级,每7步执行一次
92/// - 3: 最低优先级,每15步执行一次
93pub struct AsyncScheduler {
94    /// 任务队列,使用双端队列实现高效的入队出队操作
95    queue: VecDeque<Task>,
96    /// 主任务处理器,用于处理调度器完成时的结果
97    main_task_handler: (Arc<OnionAsyncHandle>, GCArcStorage),
98    /// 当前调度步数,用于计算任务的执行时机
99    step: u64,
100}
101
102impl AsyncScheduler {
103    /// 创建一个新的异步调度器
104    ///
105    /// # 参数
106    /// * `main_task` - 主要任务,调度器会首先执行此任务
107    ///
108    /// # 返回值
109    /// 返回新创建的 AsyncScheduler 实例,主任务已添加到队列中
110    pub fn new(main_task: Task) -> Self {
111        let mut queue = VecDeque::new();
112        let main_task_handler = main_task.task_handler.clone();
113        queue.push_back(main_task);
114        AsyncScheduler {
115            queue,
116            main_task_handler,
117            step: 0,
118        }
119    }
120}
121
122impl Runnable for AsyncScheduler {
123    /// 执行一个调度步骤
124    ///
125    /// 这是调度器的核心方法,负责遍历任务队列并按优先级调度执行任务。
126    /// 每次调用会增加步数计数器,并检查每个任务是否应该在当前步执行。
127    ///
128    /// # 参数
129    /// * `gc` - 垃圾收集器的可变引用,用于内存管理
130    ///
131    /// # 返回值
132    /// * `StepResult::Continue` - 还有任务需要继续执行
133    /// * `StepResult::Return(_)` - 所有任务完成,返回主任务的结果
134    /// * `StepResult::Error(_)` - 执行过程中出现错误
135    ///
136    /// # 调度逻辑
137    /// 1. 检查队列是否为空,为空则返回主任务结果
138    /// 2. 增加步数计数器
139    /// 3. 遍历队列中的每个任务
140    /// 4. 根据任务优先级和当前步数决定是否执行
141    /// 5. 处理任务执行结果:完成、继续、挂起或错误
142    /// 6. 动态调整任务优先级和队列位置
143    fn step(&mut self, gc: &mut GC<OnionObjectCell>) -> StepResult {
144        // 单队列调度:遍历队列,按步数调度
145        let len = self.queue.len();
146        if len == 0 {
147            // 所有任务都已完成,此时对 main_task_handler 执行valueof
148            return StepResult::Return(
149                unwrap_step_result!(self.main_task_handler.0.value_of()).into(),
150            );
151        }
152        let mut i = 0;
153        self.step += 1;
154        while i < len {
155            if let Some(mut task) = self.queue.pop_front() {
156                // 只有 step % generate_sched_step(priority) == 0 时才调度
157                if self.step % generate_sched_step(task.priority) == 0 {
158                    let step_result = task.runnable.step(gc);
159                    match step_result {
160                        StepResult::Continue => {
161                            task.priority = 0; // 重置优先级
162                            self.queue.push_back(task);
163                        }
164                        StepResult::Return(ref result) => {
165                            unwrap_step_result!(task.task_handler.0.set_result(result.weak()));
166                        }
167                        StepResult::Error(RuntimeError::Pending) => {
168                            // 一旦pending立即降级
169                            task.priority = std::cmp::min(task.priority + 1, NUM_PRIORITY_LEVELS);
170                            self.queue.push_back(task);
171                        }
172                        e @ StepResult::Error(_) => return e,
173                        StepResult::NewRunnable(_) => {
174                            // AsyncScheduler 不支持 NewRunnable 因为它没有意义,出现 NewRunnable 就意味着逻辑有问题
175                            return StepResult::Error(RuntimeError::DetailedError(
176                                "AsyncScheduler does not support NewRunnable"
177                                    .to_string()
178                                    .into(),
179                            ));
180                        }
181                        StepResult::ReplaceRunnable(_) => {
182                            // 同上
183                            return StepResult::Error(RuntimeError::DetailedError(
184                                "AsyncScheduler does not support ReplaceRunnable"
185                                    .to_string()
186                                    .into(),
187                            ));
188                        }
189                        StepResult::SpawnRunnable(new_task) => {
190                            self.queue.push_back(*new_task);
191                            self.queue.push_back(task);
192                        }
193                    }
194                } else {
195                    // 未到调度步,放回队尾
196                    self.queue.push_back(task);
197                }
198            }
199            i += 1;
200        }
201        StepResult::Continue
202    }
203
204    /// 接收来自其他 Runnable 的结果
205    ///
206    /// AsyncScheduler 不支持接收外部结果,因为它是顶层调度器。
207    /// 任何尝试向调度器发送结果的操作都会返回错误。
208    ///
209    /// # 参数
210    /// * `_step_result` - 被忽略的步骤结果
211    /// * `_gc` - 被忽略的垃圾收集器引用
212    ///
213    /// # 返回值
214    /// 总是返回 `RuntimeError::DetailedError`,表示不支持此操作
215    fn receive(
216        &mut self,
217        _step_result: &StepResult,
218        _gc: &mut GC<OnionObjectCell>,
219    ) -> Result<(), RuntimeError> {
220        Err(RuntimeError::DetailedError(
221            "AsyncScheduler does not support receive".into(),
222        ))
223    }
224
225    /// 格式化调度器的当前上下文信息
226    ///
227    /// 生成包含调度器状态和所有任务详细信息的格式化字符串,
228    /// 用于调试和监控调度器的运行状态。
229    ///
230    /// # 返回值
231    /// 返回包含以下信息的格式化字符串:
232    /// - 调度器当前状态(步数、队列长度)
233    /// - 每个任务的详细信息(优先级、下次执行步数、类型)
234    /// - 每个任务内部的上下文信息(缩进显示)
235    ///
236    /// # 输出格式
237    /// ```
238    /// -> AsyncScheduler Status:
239    ///    - Current Step: 42
240    ///    - Total Tasks in Queue: 3
241    /// --- Task Queue Details ---
242    ///   [Task #0] Priority: 1 (Next run at step 45), Type: SomeRunnable
243    ///     ... (task internal context)
244    ///   [Task #1] Priority: 0 (Next run at step 43), Type: AnotherRunnable
245    ///     ... (task internal context)
246    /// ```
247    fn format_context(&self) -> String {
248        let mut output = Vec::new();
249
250        // 1. 调度器自身的状态
251        output.push(format!(
252            "-> AsyncScheduler Status:\n   - Current Step: {}\n   - Total Tasks in Queue: {}",
253            self.step,
254            self.queue.len()
255        ));
256
257        // 2. 遍历队列中的所有任务
258        if self.queue.is_empty() {
259            output.push("   - Queue is empty.".to_string());
260        } else {
261            output.push("--- Task Queue Details ---".to_string());
262            for (index, task) in self.queue.iter().enumerate() {
263                // 下一次轮到该任务执行的步数
264                let next_run_step = {
265                    let sched_interval = generate_sched_step(task.priority);
266                    // 计算下一个能被 sched_interval 整除的 step
267                    if self.step % sched_interval == 0 {
268                        self.step // 就是当前步
269                    } else {
270                        self.step - (self.step % sched_interval) + sched_interval
271                    }
272                };
273
274                // 获取 Runnable 的类型名
275                let runnable_type = std::any::type_name_of_val(&*task.runnable);
276
277                // 3. 为每个任务创建一个摘要条目
278                let task_summary = format!(
279                    "  [Task #{}] Priority: {} (Next run at step {}), Type: {}",
280                    index,
281                    task.priority,
282                    next_run_step,
283                    runnable_type.split("::").last().unwrap_or(runnable_type) // 简化类型名显示
284                );
285                output.push(task_summary);
286
287                // 4. 获取并缩进该任务内部的上下文
288                let inner_context = task.runnable.format_context();
289                for line in inner_context.lines() {
290                    // 为内部上下文的每一行添加缩进,以保持层次结构清晰
291                    output.push(format!("    {}", line));
292                }
293            }
294        }
295
296        output.join("\n")
297    }
298}