Skip to main content

maple_runtime/scheduler/
mod.rs

1//! Resonance Scheduler - attention-aware task scheduling
2
3use crate::config::SchedulingConfig;
4use crate::runtime_core::{DeferralReason, RejectionReason, ScheduleHandle, TaskId};
5use crate::types::*;
6use std::cmp::Ordering;
7use std::collections::{BinaryHeap, HashMap};
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11/// Schedules resonance processing respecting attention budgets
12pub struct ResonanceScheduler {
13    /// Priority queues by attention class
14    queues: Arc<RwLock<HashMap<AttentionClass, PriorityQueue>>>,
15
16    /// Circuit breakers for overload protection
17    circuit_breakers: Arc<RwLock<HashMap<ResonatorId, CircuitBreaker>>>,
18
19    /// Configuration
20    config: SchedulingConfig,
21}
22
23impl ResonanceScheduler {
24    pub fn new(config: &SchedulingConfig) -> Self {
25        let mut queues = HashMap::new();
26        queues.insert(AttentionClass::Critical, PriorityQueue::new());
27        queues.insert(AttentionClass::High, PriorityQueue::new());
28        queues.insert(AttentionClass::Normal, PriorityQueue::new());
29        queues.insert(AttentionClass::Low, PriorityQueue::new());
30
31        Self {
32            queues: Arc::new(RwLock::new(queues)),
33            circuit_breakers: Arc::new(RwLock::new(HashMap::new())),
34            config: config.clone(),
35        }
36    }
37
38    /// Schedule a resonance task
39    ///
40    /// This respects attention budgets and circuit breakers.
41    pub async fn schedule(&self, task: ResonanceTask) -> ScheduleHandle {
42        // Check circuit breaker
43        let circuit_breakers = self.circuit_breakers.read().await;
44        if let Some(breaker) = circuit_breakers.get(&task.target) {
45            if breaker.is_open() {
46                tracing::warn!("Circuit breaker open for {}", task.target);
47                return ScheduleHandle::rejected(RejectionReason::CircuitOpen);
48            }
49        }
50        drop(circuit_breakers);
51
52        // Determine attention class
53        let attention_class = self.classify_attention(&task);
54
55        // Check if attention is available (placeholder)
56        if !self.has_attention_for(&task) {
57            tracing::debug!("Attention unavailable for task {}", task.id);
58            return ScheduleHandle::deferred(DeferralReason::AttentionUnavailable);
59        }
60
61        // Check queue capacity
62        let mut queues = self.queues.write().await;
63        let queue = queues.get_mut(&attention_class).unwrap();
64
65        if queue.is_full(self.config.max_queue_size) {
66            tracing::warn!("Queue full for attention class {:?}", attention_class);
67            return ScheduleHandle::rejected(RejectionReason::QueueFull);
68        }
69
70        // Add to appropriate queue
71        queue.push(task.clone());
72
73        tracing::debug!(
74            "Scheduled task {} for {} (class: {:?})",
75            task.id,
76            task.target,
77            attention_class
78        );
79
80        ScheduleHandle::scheduled(task.id)
81    }
82
83    /// Classify attention requirements for a task
84    fn classify_attention(&self, task: &ResonanceTask) -> AttentionClass {
85        // In real implementation, would analyze task characteristics
86        task.attention_class
87    }
88
89    /// Check if attention is available (placeholder)
90    fn has_attention_for(&self, _task: &ResonanceTask) -> bool {
91        // In real implementation, would check attention allocator
92        true
93    }
94
95    /// Trip circuit breaker for a Resonator
96    pub async fn trip_circuit_breaker(&self, resonator: ResonatorId) {
97        let mut breakers = self.circuit_breakers.write().await;
98        let breaker = breakers
99            .entry(resonator)
100            .or_insert_with(|| CircuitBreaker::new(self.config.circuit_breaker_threshold));
101
102        breaker.trip();
103        tracing::warn!("Circuit breaker tripped for {}", resonator);
104    }
105
106    /// Reset circuit breaker for a Resonator
107    pub async fn reset_circuit_breaker(&self, resonator: &ResonatorId) {
108        let mut breakers = self.circuit_breakers.write().await;
109        if let Some(breaker) = breakers.get_mut(resonator) {
110            breaker.reset();
111            tracing::info!("Circuit breaker reset for {}", resonator);
112        }
113    }
114
115    /// Get next task to process (for worker)
116    pub async fn next_task(&self) -> Option<ResonanceTask> {
117        let mut queues = self.queues.write().await;
118
119        // Process in priority order: Critical > High > Normal > Low
120        for class in &[
121            AttentionClass::Critical,
122            AttentionClass::High,
123            AttentionClass::Normal,
124            AttentionClass::Low,
125        ] {
126            if let Some(queue) = queues.get_mut(class) {
127                if let Some(task) = queue.pop() {
128                    return Some(task);
129                }
130            }
131        }
132
133        None
134    }
135}
136
137/// A resonance task to be scheduled
138#[derive(Debug, Clone)]
139pub struct ResonanceTask {
140    pub id: TaskId,
141    pub target: ResonatorId,
142    pub attention_class: AttentionClass,
143    pub priority: u32,
144    pub payload: TaskPayload,
145}
146
147impl ResonanceTask {
148    pub fn new(target: ResonatorId, attention_class: AttentionClass, payload: TaskPayload) -> Self {
149        Self {
150            id: TaskId::generate(),
151            target,
152            attention_class,
153            priority: 0,
154            payload,
155        }
156    }
157}
158
159/// Task payload (placeholder)
160#[derive(Debug, Clone)]
161pub enum TaskPayload {
162    ProcessCoupling(CouplingId),
163    FormMeaning,
164    StabilizeIntent,
165    ExecuteCommitment(CommitmentId),
166}
167
168/// Priority queue for resonance tasks
169struct PriorityQueue {
170    heap: BinaryHeap<PrioritizedTask>,
171}
172
173impl PriorityQueue {
174    fn new() -> Self {
175        Self {
176            heap: BinaryHeap::new(),
177        }
178    }
179
180    fn push(&mut self, task: ResonanceTask) {
181        self.heap.push(PrioritizedTask(task));
182    }
183
184    fn pop(&mut self) -> Option<ResonanceTask> {
185        self.heap.pop().map(|pt| pt.0)
186    }
187
188    fn is_full(&self, max_size: usize) -> bool {
189        self.heap.len() >= max_size
190    }
191}
192
193/// Wrapper for priority ordering
194struct PrioritizedTask(ResonanceTask);
195
196impl Ord for PrioritizedTask {
197    fn cmp(&self, other: &Self) -> Ordering {
198        self.0.priority.cmp(&other.0.priority)
199    }
200}
201
202impl PartialOrd for PrioritizedTask {
203    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
204        Some(self.cmp(other))
205    }
206}
207
208impl Eq for PrioritizedTask {}
209
210impl PartialEq for PrioritizedTask {
211    fn eq(&self, other: &Self) -> bool {
212        self.0.priority == other.0.priority
213    }
214}
215
216/// Circuit breaker for overload protection
217struct CircuitBreaker {
218    state: CircuitBreakerState,
219    failure_threshold: u32,
220    failure_count: u32,
221}
222
223#[derive(Debug, Clone, Copy, PartialEq, Eq)]
224#[allow(dead_code)]
225enum CircuitBreakerState {
226    Closed,
227    Open,
228    HalfOpen,
229}
230
231impl CircuitBreaker {
232    fn new(failure_threshold: u32) -> Self {
233        Self {
234            state: CircuitBreakerState::Closed,
235            failure_threshold,
236            failure_count: 0,
237        }
238    }
239
240    fn is_open(&self) -> bool {
241        self.state == CircuitBreakerState::Open
242    }
243
244    fn trip(&mut self) {
245        self.failure_count += 1;
246        if self.failure_count >= self.failure_threshold {
247            self.state = CircuitBreakerState::Open;
248        }
249    }
250
251    fn reset(&mut self) {
252        self.state = CircuitBreakerState::Closed;
253        self.failure_count = 0;
254    }
255}