maple_runtime/scheduler/
mod.rs1use std::collections::{BinaryHeap, HashMap};
4use std::cmp::Ordering;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use crate::types::*;
8use crate::runtime_core::{ScheduleHandle, TaskId, RejectionReason, DeferralReason};
9use crate::config::SchedulingConfig;
10
11pub struct ResonanceScheduler {
13 queues: Arc<RwLock<HashMap<AttentionClass, PriorityQueue>>>,
15
16 circuit_breakers: Arc<RwLock<HashMap<ResonatorId, CircuitBreaker>>>,
18
19 config: SchedulingConfig,
21}
22
23impl ResonanceScheduler {
24 pub fn new(config: &SchedulingConfig) -> Self {
25 let mut queues = HashMap::new();
26 queues.insert(AttentionClass::Critical, PriorityQueue::new());
27 queues.insert(AttentionClass::High, PriorityQueue::new());
28 queues.insert(AttentionClass::Normal, PriorityQueue::new());
29 queues.insert(AttentionClass::Low, PriorityQueue::new());
30
31 Self {
32 queues: Arc::new(RwLock::new(queues)),
33 circuit_breakers: Arc::new(RwLock::new(HashMap::new())),
34 config: config.clone(),
35 }
36 }
37
38 pub async fn schedule(&self, task: ResonanceTask) -> ScheduleHandle {
42 let circuit_breakers = self.circuit_breakers.read().await;
44 if let Some(breaker) = circuit_breakers.get(&task.target) {
45 if breaker.is_open() {
46 tracing::warn!("Circuit breaker open for {}", task.target);
47 return ScheduleHandle::rejected(RejectionReason::CircuitOpen);
48 }
49 }
50 drop(circuit_breakers);
51
52 let attention_class = self.classify_attention(&task);
54
55 if !self.has_attention_for(&task) {
57 tracing::debug!("Attention unavailable for task {}", task.id);
58 return ScheduleHandle::deferred(DeferralReason::AttentionUnavailable);
59 }
60
61 let mut queues = self.queues.write().await;
63 let queue = queues.get_mut(&attention_class).unwrap();
64
65 if queue.is_full(self.config.max_queue_size) {
66 tracing::warn!("Queue full for attention class {:?}", attention_class);
67 return ScheduleHandle::rejected(RejectionReason::QueueFull);
68 }
69
70 queue.push(task.clone());
72
73 tracing::debug!(
74 "Scheduled task {} for {} (class: {:?})",
75 task.id,
76 task.target,
77 attention_class
78 );
79
80 ScheduleHandle::scheduled(task.id)
81 }
82
83 fn classify_attention(&self, task: &ResonanceTask) -> AttentionClass {
85 task.attention_class
87 }
88
89 fn has_attention_for(&self, _task: &ResonanceTask) -> bool {
91 true
93 }
94
95 pub async fn trip_circuit_breaker(&self, resonator: ResonatorId) {
97 let mut breakers = self.circuit_breakers.write().await;
98 let breaker = breakers
99 .entry(resonator)
100 .or_insert_with(|| CircuitBreaker::new(self.config.circuit_breaker_threshold));
101
102 breaker.trip();
103 tracing::warn!("Circuit breaker tripped for {}", resonator);
104 }
105
106 pub async fn reset_circuit_breaker(&self, resonator: &ResonatorId) {
108 let mut breakers = self.circuit_breakers.write().await;
109 if let Some(breaker) = breakers.get_mut(resonator) {
110 breaker.reset();
111 tracing::info!("Circuit breaker reset for {}", resonator);
112 }
113 }
114
115 pub async fn next_task(&self) -> Option<ResonanceTask> {
117 let mut queues = self.queues.write().await;
118
119 for class in &[
121 AttentionClass::Critical,
122 AttentionClass::High,
123 AttentionClass::Normal,
124 AttentionClass::Low,
125 ] {
126 if let Some(queue) = queues.get_mut(class) {
127 if let Some(task) = queue.pop() {
128 return Some(task);
129 }
130 }
131 }
132
133 None
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct ResonanceTask {
140 pub id: TaskId,
141 pub target: ResonatorId,
142 pub attention_class: AttentionClass,
143 pub priority: u32,
144 pub payload: TaskPayload,
145}
146
147impl ResonanceTask {
148 pub fn new(
149 target: ResonatorId,
150 attention_class: AttentionClass,
151 payload: TaskPayload,
152 ) -> Self {
153 Self {
154 id: TaskId::generate(),
155 target,
156 attention_class,
157 priority: 0,
158 payload,
159 }
160 }
161}
162
163#[derive(Debug, Clone)]
165pub enum TaskPayload {
166 ProcessCoupling(CouplingId),
167 FormMeaning,
168 StabilizeIntent,
169 ExecuteCommitment(CommitmentId),
170}
171
172struct PriorityQueue {
174 heap: BinaryHeap<PrioritizedTask>,
175}
176
177impl PriorityQueue {
178 fn new() -> Self {
179 Self {
180 heap: BinaryHeap::new(),
181 }
182 }
183
184 fn push(&mut self, task: ResonanceTask) {
185 self.heap.push(PrioritizedTask(task));
186 }
187
188 fn pop(&mut self) -> Option<ResonanceTask> {
189 self.heap.pop().map(|pt| pt.0)
190 }
191
192 fn is_full(&self, max_size: usize) -> bool {
193 self.heap.len() >= max_size
194 }
195}
196
197struct PrioritizedTask(ResonanceTask);
199
200impl Ord for PrioritizedTask {
201 fn cmp(&self, other: &Self) -> Ordering {
202 self.0.priority.cmp(&other.0.priority)
203 }
204}
205
206impl PartialOrd for PrioritizedTask {
207 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
208 Some(self.cmp(other))
209 }
210}
211
212impl Eq for PrioritizedTask {}
213
214impl PartialEq for PrioritizedTask {
215 fn eq(&self, other: &Self) -> bool {
216 self.0.priority == other.0.priority
217 }
218}
219
220struct CircuitBreaker {
222 state: CircuitBreakerState,
223 failure_threshold: u32,
224 failure_count: u32,
225}
226
227#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228enum CircuitBreakerState {
229 Closed,
230 Open,
231 HalfOpen,
232}
233
234impl CircuitBreaker {
235 fn new(failure_threshold: u32) -> Self {
236 Self {
237 state: CircuitBreakerState::Closed,
238 failure_threshold,
239 failure_count: 0,
240 }
241 }
242
243 fn is_open(&self) -> bool {
244 self.state == CircuitBreakerState::Open
245 }
246
247 fn trip(&mut self) {
248 self.failure_count += 1;
249 if self.failure_count >= self.failure_threshold {
250 self.state = CircuitBreakerState::Open;
251 }
252 }
253
254 fn reset(&mut self) {
255 self.state = CircuitBreakerState::Closed;
256 self.failure_count = 0;
257 }
258}