onion_vm/lambda/scheduler/async_scheduler.rs
1//! 异步任务调度器(AsyncScheduler)实现:用于管理和调度一组可运行对象(Runnable)。
2//!
3//! 该模块实现了基于优先级的协作式多任务调度,支持任务的动态优先级调整和新任务的生成。
4//! AsyncScheduler 作为 Onion 虚拟机的顶层调度单元,负责按优先级推进任务队列、处理返回值、错误和新任务的生成。
5use arc_gc::gc::GC;
6use std::{collections::VecDeque, sync::Arc};
7
8use crate::{
9 lambda::runnable::{Runnable, RuntimeError, StepResult},
10 types::{
11 async_handle::OnionAsyncHandle,
12 object::{GCArcStorage, OnionObjectCell, OnionObjectExt},
13 },
14 unwrap_step_result,
15};
16
17/// 优先级级别数量减一,用于限制最大优先级
18const NUM_PRIORITY_LEVELS: usize = 3;
19
20/// 生成调度步长,基于 2^(n+1)-1 序列
21///
22/// 这个函数为给定的优先级生成调度间隔。优先级越高(数值越大),
23/// 调度间隔越长,任务被执行的频率越低。
24///
25/// # 参数
26/// * `n` - 优先级级别(0表示最高优先级)
27///
28/// # 返回值
29/// 返回该优先级对应的调度间隔步数
30///
31/// # 示例
32/// - 优先级0: 2^1-1 = 1 (每步都执行)
33/// - 优先级1: 2^2-1 = 3 (每3步执行一次)
34/// - 优先级2: 2^3-1 = 7 (每7步执行一次)
35#[inline(always)]
36fn generate_sched_step(n: usize) -> u64 {
37 (1u64 << (n + 1)) - 1
38}
39
40/// 表示调度器中的一个任务
41///
42/// 每个任务包含可执行的代码、任务处理器和优先级信息。
43/// 优先级决定了任务被调度的频率,数值越大表示优先级越低。
44pub struct Task {
45 /// 可执行的任务代码
46 runnable: Box<dyn Runnable>,
47 /// 异步任务处理器,用于设置任务结果和管理任务状态
48 task_handler: (Arc<OnionAsyncHandle>, GCArcStorage),
49 /// 任务优先级,0表示最高优先级,数值越大优先级越低
50 priority: usize,
51}
52
53impl Task {
54 /// 创建一个新的任务
55 ///
56 /// # 参数
57 /// * `runnable` - 实现了 Runnable trait 的可执行对象
58 /// * `task_handler` - 异步任务处理器,包含 AsyncHandle 和 GC 存储
59 /// * `priority` - 任务的初始优先级,0表示最高优先级
60 ///
61 /// # 返回值
62 /// 返回新创建的 Task 实例
63 pub fn new(
64 runnable: Box<dyn Runnable>,
65 task_handler: (Arc<OnionAsyncHandle>, GCArcStorage),
66 priority: usize,
67 ) -> Self {
68 Self {
69 runnable,
70 task_handler,
71 priority,
72 }
73 }
74}
75
76/// 异步任务调度器
77///
78/// 实现了基于优先级的协作式多任务调度。任务按照优先级和调度间隔被执行,
79/// 优先级越高的任务执行频率越高。调度器使用轮询方式处理任务队列,
80/// 支持任务的动态优先级调整和新任务的生成。
81///
82/// # 调度策略
83/// - 使用单队列存储所有任务
84/// - 基于 2^(n+1)-1 序列确定调度间隔
85/// - 当任务pending时自动降低优先级
86/// - 支持任务完成时的结果设置
87///
88/// # 优先级系统
89/// - 0: 最高优先级,每步都执行
90/// - 1: 中等优先级,每3步执行一次
91/// - 2: 低优先级,每7步执行一次
92/// - 3: 最低优先级,每15步执行一次
93pub struct AsyncScheduler {
94 /// 任务队列,使用双端队列实现高效的入队出队操作
95 queue: VecDeque<Task>,
96 /// 主任务处理器,用于处理调度器完成时的结果
97 main_task_handler: (Arc<OnionAsyncHandle>, GCArcStorage),
98 /// 当前调度步数,用于计算任务的执行时机
99 step: u64,
100}
101
102impl AsyncScheduler {
103 /// 创建一个新的异步调度器
104 ///
105 /// # 参数
106 /// * `main_task` - 主要任务,调度器会首先执行此任务
107 ///
108 /// # 返回值
109 /// 返回新创建的 AsyncScheduler 实例,主任务已添加到队列中
110 pub fn new(main_task: Task) -> Self {
111 let mut queue = VecDeque::new();
112 let main_task_handler = main_task.task_handler.clone();
113 queue.push_back(main_task);
114 AsyncScheduler {
115 queue,
116 main_task_handler,
117 step: 0,
118 }
119 }
120}
121
122impl Runnable for AsyncScheduler {
123 /// 执行一个调度步骤
124 ///
125 /// 这是调度器的核心方法,负责遍历任务队列并按优先级调度执行任务。
126 /// 每次调用会增加步数计数器,并检查每个任务是否应该在当前步执行。
127 ///
128 /// # 参数
129 /// * `gc` - 垃圾收集器的可变引用,用于内存管理
130 ///
131 /// # 返回值
132 /// * `StepResult::Continue` - 还有任务需要继续执行
133 /// * `StepResult::Return(_)` - 所有任务完成,返回主任务的结果
134 /// * `StepResult::Error(_)` - 执行过程中出现错误
135 ///
136 /// # 调度逻辑
137 /// 1. 检查队列是否为空,为空则返回主任务结果
138 /// 2. 增加步数计数器
139 /// 3. 遍历队列中的每个任务
140 /// 4. 根据任务优先级和当前步数决定是否执行
141 /// 5. 处理任务执行结果:完成、继续、挂起或错误
142 /// 6. 动态调整任务优先级和队列位置
143 fn step(&mut self, gc: &mut GC<OnionObjectCell>) -> StepResult {
144 // 单队列调度:遍历队列,按步数调度
145 let len = self.queue.len();
146 if len == 0 {
147 // 所有任务都已完成,此时对 main_task_handler 执行valueof
148 return StepResult::Return(
149 unwrap_step_result!(self.main_task_handler.0.value_of()).into(),
150 );
151 }
152 let mut i = 0;
153 self.step += 1;
154 while i < len {
155 if let Some(mut task) = self.queue.pop_front() {
156 // 只有 step % generate_sched_step(priority) == 0 时才调度
157 if self.step % generate_sched_step(task.priority) == 0 {
158 let step_result = task.runnable.step(gc);
159 match step_result {
160 StepResult::Continue => {
161 task.priority = 0; // 重置优先级
162 self.queue.push_back(task);
163 }
164 StepResult::Return(ref result) => {
165 unwrap_step_result!(task.task_handler.0.set_result(result.weak()));
166 }
167 StepResult::Error(RuntimeError::Pending) => {
168 // 一旦pending立即降级
169 task.priority = std::cmp::min(task.priority + 1, NUM_PRIORITY_LEVELS);
170 self.queue.push_back(task);
171 }
172 e @ StepResult::Error(_) => return e,
173 StepResult::NewRunnable(_) => {
174 // AsyncScheduler 不支持 NewRunnable 因为它没有意义,出现 NewRunnable 就意味着逻辑有问题
175 return StepResult::Error(RuntimeError::DetailedError(
176 "AsyncScheduler does not support NewRunnable"
177 .to_string()
178 .into(),
179 ));
180 }
181 StepResult::ReplaceRunnable(_) => {
182 // 同上
183 return StepResult::Error(RuntimeError::DetailedError(
184 "AsyncScheduler does not support ReplaceRunnable"
185 .to_string()
186 .into(),
187 ));
188 }
189 StepResult::SpawnRunnable(new_task) => {
190 self.queue.push_back(*new_task);
191 self.queue.push_back(task);
192 }
193 }
194 } else {
195 // 未到调度步,放回队尾
196 self.queue.push_back(task);
197 }
198 }
199 i += 1;
200 }
201 StepResult::Continue
202 }
203
204 /// 接收来自其他 Runnable 的结果
205 ///
206 /// AsyncScheduler 不支持接收外部结果,因为它是顶层调度器。
207 /// 任何尝试向调度器发送结果的操作都会返回错误。
208 ///
209 /// # 参数
210 /// * `_step_result` - 被忽略的步骤结果
211 /// * `_gc` - 被忽略的垃圾收集器引用
212 ///
213 /// # 返回值
214 /// 总是返回 `RuntimeError::DetailedError`,表示不支持此操作
215 fn receive(
216 &mut self,
217 _step_result: &StepResult,
218 _gc: &mut GC<OnionObjectCell>,
219 ) -> Result<(), RuntimeError> {
220 Err(RuntimeError::DetailedError(
221 "AsyncScheduler does not support receive".into(),
222 ))
223 }
224
225 /// 格式化调度器的当前上下文信息
226 ///
227 /// 生成包含调度器状态和所有任务详细信息的格式化字符串,
228 /// 用于调试和监控调度器的运行状态。
229 ///
230 /// # 返回值
231 /// 返回包含以下信息的格式化字符串:
232 /// - 调度器当前状态(步数、队列长度)
233 /// - 每个任务的详细信息(优先级、下次执行步数、类型)
234 /// - 每个任务内部的上下文信息(缩进显示)
235 ///
236 /// # 输出格式
237 /// ```
238 /// -> AsyncScheduler Status:
239 /// - Current Step: 42
240 /// - Total Tasks in Queue: 3
241 /// --- Task Queue Details ---
242 /// [Task #0] Priority: 1 (Next run at step 45), Type: SomeRunnable
243 /// ... (task internal context)
244 /// [Task #1] Priority: 0 (Next run at step 43), Type: AnotherRunnable
245 /// ... (task internal context)
246 /// ```
247 fn format_context(&self) -> String {
248 let mut output = Vec::new();
249
250 // 1. 调度器自身的状态
251 output.push(format!(
252 "-> AsyncScheduler Status:\n - Current Step: {}\n - Total Tasks in Queue: {}",
253 self.step,
254 self.queue.len()
255 ));
256
257 // 2. 遍历队列中的所有任务
258 if self.queue.is_empty() {
259 output.push(" - Queue is empty.".to_string());
260 } else {
261 output.push("--- Task Queue Details ---".to_string());
262 for (index, task) in self.queue.iter().enumerate() {
263 // 下一次轮到该任务执行的步数
264 let next_run_step = {
265 let sched_interval = generate_sched_step(task.priority);
266 // 计算下一个能被 sched_interval 整除的 step
267 if self.step % sched_interval == 0 {
268 self.step // 就是当前步
269 } else {
270 self.step - (self.step % sched_interval) + sched_interval
271 }
272 };
273
274 // 获取 Runnable 的类型名
275 let runnable_type = std::any::type_name_of_val(&*task.runnable);
276
277 // 3. 为每个任务创建一个摘要条目
278 let task_summary = format!(
279 " [Task #{}] Priority: {} (Next run at step {}), Type: {}",
280 index,
281 task.priority,
282 next_run_step,
283 runnable_type.split("::").last().unwrap_or(runnable_type) // 简化类型名显示
284 );
285 output.push(task_summary);
286
287 // 4. 获取并缩进该任务内部的上下文
288 let inner_context = task.runnable.format_context();
289 for line in inner_context.lines() {
290 // 为内部上下文的每一行添加缩进,以保持层次结构清晰
291 output.push(format!(" {}", line));
292 }
293 }
294 }
295
296 output.join("\n")
297 }
298}