maple_runtime/scheduler/
mod.rs1use crate::config::SchedulingConfig;
4use crate::runtime_core::{DeferralReason, RejectionReason, ScheduleHandle, TaskId};
5use crate::types::*;
6use std::cmp::Ordering;
7use std::collections::{BinaryHeap, HashMap};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11pub struct ResonanceScheduler {
13 queues: Arc<RwLock<HashMap<AttentionClass, PriorityQueue>>>,
15
16 circuit_breakers: Arc<RwLock<HashMap<ResonatorId, CircuitBreaker>>>,
18
19 config: SchedulingConfig,
21}
22
23impl ResonanceScheduler {
24 pub fn new(config: &SchedulingConfig) -> Self {
25 let mut queues = HashMap::new();
26 queues.insert(AttentionClass::Critical, PriorityQueue::new());
27 queues.insert(AttentionClass::High, PriorityQueue::new());
28 queues.insert(AttentionClass::Normal, PriorityQueue::new());
29 queues.insert(AttentionClass::Low, PriorityQueue::new());
30
31 Self {
32 queues: Arc::new(RwLock::new(queues)),
33 circuit_breakers: Arc::new(RwLock::new(HashMap::new())),
34 config: config.clone(),
35 }
36 }
37
38 pub async fn schedule(&self, task: ResonanceTask) -> ScheduleHandle {
42 let circuit_breakers = self.circuit_breakers.read().await;
44 if let Some(breaker) = circuit_breakers.get(&task.target) {
45 if breaker.is_open() {
46 tracing::warn!("Circuit breaker open for {}", task.target);
47 return ScheduleHandle::rejected(RejectionReason::CircuitOpen);
48 }
49 }
50 drop(circuit_breakers);
51
52 let attention_class = self.classify_attention(&task);
54
55 if !self.has_attention_for(&task) {
57 tracing::debug!("Attention unavailable for task {}", task.id);
58 return ScheduleHandle::deferred(DeferralReason::AttentionUnavailable);
59 }
60
61 let mut queues = self.queues.write().await;
63 let queue = queues.get_mut(&attention_class).unwrap();
64
65 if queue.is_full(self.config.max_queue_size) {
66 tracing::warn!("Queue full for attention class {:?}", attention_class);
67 return ScheduleHandle::rejected(RejectionReason::QueueFull);
68 }
69
70 queue.push(task.clone());
72
73 tracing::debug!(
74 "Scheduled task {} for {} (class: {:?})",
75 task.id,
76 task.target,
77 attention_class
78 );
79
80 ScheduleHandle::scheduled(task.id)
81 }
82
83 fn classify_attention(&self, task: &ResonanceTask) -> AttentionClass {
85 task.attention_class
87 }
88
89 fn has_attention_for(&self, _task: &ResonanceTask) -> bool {
91 true
93 }
94
95 pub async fn trip_circuit_breaker(&self, resonator: ResonatorId) {
97 let mut breakers = self.circuit_breakers.write().await;
98 let breaker = breakers
99 .entry(resonator)
100 .or_insert_with(|| CircuitBreaker::new(self.config.circuit_breaker_threshold));
101
102 breaker.trip();
103 tracing::warn!("Circuit breaker tripped for {}", resonator);
104 }
105
106 pub async fn reset_circuit_breaker(&self, resonator: &ResonatorId) {
108 let mut breakers = self.circuit_breakers.write().await;
109 if let Some(breaker) = breakers.get_mut(resonator) {
110 breaker.reset();
111 tracing::info!("Circuit breaker reset for {}", resonator);
112 }
113 }
114
115 pub async fn next_task(&self) -> Option<ResonanceTask> {
117 let mut queues = self.queues.write().await;
118
119 for class in &[
121 AttentionClass::Critical,
122 AttentionClass::High,
123 AttentionClass::Normal,
124 AttentionClass::Low,
125 ] {
126 if let Some(queue) = queues.get_mut(class) {
127 if let Some(task) = queue.pop() {
128 return Some(task);
129 }
130 }
131 }
132
133 None
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct ResonanceTask {
140 pub id: TaskId,
141 pub target: ResonatorId,
142 pub attention_class: AttentionClass,
143 pub priority: u32,
144 pub payload: TaskPayload,
145}
146
147impl ResonanceTask {
148 pub fn new(target: ResonatorId, attention_class: AttentionClass, payload: TaskPayload) -> Self {
149 Self {
150 id: TaskId::generate(),
151 target,
152 attention_class,
153 priority: 0,
154 payload,
155 }
156 }
157}
158
159#[derive(Debug, Clone)]
161pub enum TaskPayload {
162 ProcessCoupling(CouplingId),
163 FormMeaning,
164 StabilizeIntent,
165 ExecuteCommitment(CommitmentId),
166}
167
168struct PriorityQueue {
170 heap: BinaryHeap<PrioritizedTask>,
171}
172
173impl PriorityQueue {
174 fn new() -> Self {
175 Self {
176 heap: BinaryHeap::new(),
177 }
178 }
179
180 fn push(&mut self, task: ResonanceTask) {
181 self.heap.push(PrioritizedTask(task));
182 }
183
184 fn pop(&mut self) -> Option<ResonanceTask> {
185 self.heap.pop().map(|pt| pt.0)
186 }
187
188 fn is_full(&self, max_size: usize) -> bool {
189 self.heap.len() >= max_size
190 }
191}
192
193struct PrioritizedTask(ResonanceTask);
195
196impl Ord for PrioritizedTask {
197 fn cmp(&self, other: &Self) -> Ordering {
198 self.0.priority.cmp(&other.0.priority)
199 }
200}
201
202impl PartialOrd for PrioritizedTask {
203 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
204 Some(self.cmp(other))
205 }
206}
207
208impl Eq for PrioritizedTask {}
209
210impl PartialEq for PrioritizedTask {
211 fn eq(&self, other: &Self) -> bool {
212 self.0.priority == other.0.priority
213 }
214}
215
216struct CircuitBreaker {
218 state: CircuitBreakerState,
219 failure_threshold: u32,
220 failure_count: u32,
221}
222
223#[derive(Debug, Clone, Copy, PartialEq, Eq)]
224#[allow(dead_code)]
225enum CircuitBreakerState {
226 Closed,
227 Open,
228 HalfOpen,
229}
230
231impl CircuitBreaker {
232 fn new(failure_threshold: u32) -> Self {
233 Self {
234 state: CircuitBreakerState::Closed,
235 failure_threshold,
236 failure_count: 0,
237 }
238 }
239
240 fn is_open(&self) -> bool {
241 self.state == CircuitBreakerState::Open
242 }
243
244 fn trip(&mut self) {
245 self.failure_count += 1;
246 if self.failure_count >= self.failure_threshold {
247 self.state = CircuitBreakerState::Open;
248 }
249 }
250
251 fn reset(&mut self) {
252 self.state = CircuitBreakerState::Closed;
253 self.failure_count = 0;
254 }
255}