Skip to main content

maple_runtime/scheduler/
mod.rs

1//! Resonance Scheduler - attention-aware task scheduling
2
3use std::collections::{BinaryHeap, HashMap};
4use std::cmp::Ordering;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7use crate::types::*;
8use crate::runtime_core::{ScheduleHandle, TaskId, RejectionReason, DeferralReason};
9use crate::config::SchedulingConfig;
10
11/// Schedules resonance processing respecting attention budgets
12pub struct ResonanceScheduler {
13    /// Priority queues by attention class
14    queues: Arc<RwLock<HashMap<AttentionClass, PriorityQueue>>>,
15
16    /// Circuit breakers for overload protection
17    circuit_breakers: Arc<RwLock<HashMap<ResonatorId, CircuitBreaker>>>,
18
19    /// Configuration
20    config: SchedulingConfig,
21}
22
23impl ResonanceScheduler {
24    pub fn new(config: &SchedulingConfig) -> Self {
25        let mut queues = HashMap::new();
26        queues.insert(AttentionClass::Critical, PriorityQueue::new());
27        queues.insert(AttentionClass::High, PriorityQueue::new());
28        queues.insert(AttentionClass::Normal, PriorityQueue::new());
29        queues.insert(AttentionClass::Low, PriorityQueue::new());
30
31        Self {
32            queues: Arc::new(RwLock::new(queues)),
33            circuit_breakers: Arc::new(RwLock::new(HashMap::new())),
34            config: config.clone(),
35        }
36    }
37
38    /// Schedule a resonance task
39    ///
40    /// This respects attention budgets and circuit breakers.
41    pub async fn schedule(&self, task: ResonanceTask) -> ScheduleHandle {
42        // Check circuit breaker
43        let circuit_breakers = self.circuit_breakers.read().await;
44        if let Some(breaker) = circuit_breakers.get(&task.target) {
45            if breaker.is_open() {
46                tracing::warn!("Circuit breaker open for {}", task.target);
47                return ScheduleHandle::rejected(RejectionReason::CircuitOpen);
48            }
49        }
50        drop(circuit_breakers);
51
52        // Determine attention class
53        let attention_class = self.classify_attention(&task);
54
55        // Check if attention is available (placeholder)
56        if !self.has_attention_for(&task) {
57            tracing::debug!("Attention unavailable for task {}", task.id);
58            return ScheduleHandle::deferred(DeferralReason::AttentionUnavailable);
59        }
60
61        // Check queue capacity
62        let mut queues = self.queues.write().await;
63        let queue = queues.get_mut(&attention_class).unwrap();
64
65        if queue.is_full(self.config.max_queue_size) {
66            tracing::warn!("Queue full for attention class {:?}", attention_class);
67            return ScheduleHandle::rejected(RejectionReason::QueueFull);
68        }
69
70        // Add to appropriate queue
71        queue.push(task.clone());
72
73        tracing::debug!(
74            "Scheduled task {} for {} (class: {:?})",
75            task.id,
76            task.target,
77            attention_class
78        );
79
80        ScheduleHandle::scheduled(task.id)
81    }
82
83    /// Classify attention requirements for a task
84    fn classify_attention(&self, task: &ResonanceTask) -> AttentionClass {
85        // In real implementation, would analyze task characteristics
86        task.attention_class
87    }
88
89    /// Check if attention is available (placeholder)
90    fn has_attention_for(&self, _task: &ResonanceTask) -> bool {
91        // In real implementation, would check attention allocator
92        true
93    }
94
95    /// Trip circuit breaker for a Resonator
96    pub async fn trip_circuit_breaker(&self, resonator: ResonatorId) {
97        let mut breakers = self.circuit_breakers.write().await;
98        let breaker = breakers
99            .entry(resonator)
100            .or_insert_with(|| CircuitBreaker::new(self.config.circuit_breaker_threshold));
101
102        breaker.trip();
103        tracing::warn!("Circuit breaker tripped for {}", resonator);
104    }
105
106    /// Reset circuit breaker for a Resonator
107    pub async fn reset_circuit_breaker(&self, resonator: &ResonatorId) {
108        let mut breakers = self.circuit_breakers.write().await;
109        if let Some(breaker) = breakers.get_mut(resonator) {
110            breaker.reset();
111            tracing::info!("Circuit breaker reset for {}", resonator);
112        }
113    }
114
115    /// Get next task to process (for worker)
116    pub async fn next_task(&self) -> Option<ResonanceTask> {
117        let mut queues = self.queues.write().await;
118
119        // Process in priority order: Critical > High > Normal > Low
120        for class in &[
121            AttentionClass::Critical,
122            AttentionClass::High,
123            AttentionClass::Normal,
124            AttentionClass::Low,
125        ] {
126            if let Some(queue) = queues.get_mut(class) {
127                if let Some(task) = queue.pop() {
128                    return Some(task);
129                }
130            }
131        }
132
133        None
134    }
135}
136
137/// A resonance task to be scheduled
138#[derive(Debug, Clone)]
139pub struct ResonanceTask {
140    pub id: TaskId,
141    pub target: ResonatorId,
142    pub attention_class: AttentionClass,
143    pub priority: u32,
144    pub payload: TaskPayload,
145}
146
147impl ResonanceTask {
148    pub fn new(
149        target: ResonatorId,
150        attention_class: AttentionClass,
151        payload: TaskPayload,
152    ) -> Self {
153        Self {
154            id: TaskId::generate(),
155            target,
156            attention_class,
157            priority: 0,
158            payload,
159        }
160    }
161}
162
163/// Task payload (placeholder)
164#[derive(Debug, Clone)]
165pub enum TaskPayload {
166    ProcessCoupling(CouplingId),
167    FormMeaning,
168    StabilizeIntent,
169    ExecuteCommitment(CommitmentId),
170}
171
172/// Priority queue for resonance tasks
173struct PriorityQueue {
174    heap: BinaryHeap<PrioritizedTask>,
175}
176
177impl PriorityQueue {
178    fn new() -> Self {
179        Self {
180            heap: BinaryHeap::new(),
181        }
182    }
183
184    fn push(&mut self, task: ResonanceTask) {
185        self.heap.push(PrioritizedTask(task));
186    }
187
188    fn pop(&mut self) -> Option<ResonanceTask> {
189        self.heap.pop().map(|pt| pt.0)
190    }
191
192    fn is_full(&self, max_size: usize) -> bool {
193        self.heap.len() >= max_size
194    }
195}
196
197/// Wrapper for priority ordering
198struct PrioritizedTask(ResonanceTask);
199
200impl Ord for PrioritizedTask {
201    fn cmp(&self, other: &Self) -> Ordering {
202        self.0.priority.cmp(&other.0.priority)
203    }
204}
205
206impl PartialOrd for PrioritizedTask {
207    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
208        Some(self.cmp(other))
209    }
210}
211
212impl Eq for PrioritizedTask {}
213
214impl PartialEq for PrioritizedTask {
215    fn eq(&self, other: &Self) -> bool {
216        self.0.priority == other.0.priority
217    }
218}
219
220/// Circuit breaker for overload protection
221struct CircuitBreaker {
222    state: CircuitBreakerState,
223    failure_threshold: u32,
224    failure_count: u32,
225}
226
227#[derive(Debug, Clone, Copy, PartialEq, Eq)]
228enum CircuitBreakerState {
229    Closed,
230    Open,
231    HalfOpen,
232}
233
234impl CircuitBreaker {
235    fn new(failure_threshold: u32) -> Self {
236        Self {
237            state: CircuitBreakerState::Closed,
238            failure_threshold,
239            failure_count: 0,
240        }
241    }
242
243    fn is_open(&self) -> bool {
244        self.state == CircuitBreakerState::Open
245    }
246
247    fn trip(&mut self) {
248        self.failure_count += 1;
249        if self.failure_count >= self.failure_threshold {
250            self.state = CircuitBreakerState::Open;
251        }
252    }
253
254    fn reset(&mut self) {
255        self.state = CircuitBreakerState::Closed;
256        self.failure_count = 0;
257    }
258}