use crate::config::SchedulingConfig;
use crate::runtime_core::{DeferralReason, RejectionReason, ScheduleHandle, TaskId};
use crate::types::*;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct ResonanceScheduler {
queues: Arc<RwLock<HashMap<AttentionClass, PriorityQueue>>>,
circuit_breakers: Arc<RwLock<HashMap<ResonatorId, CircuitBreaker>>>,
config: SchedulingConfig,
}
impl ResonanceScheduler {
pub fn new(config: &SchedulingConfig) -> Self {
let mut queues = HashMap::new();
queues.insert(AttentionClass::Critical, PriorityQueue::new());
queues.insert(AttentionClass::High, PriorityQueue::new());
queues.insert(AttentionClass::Normal, PriorityQueue::new());
queues.insert(AttentionClass::Low, PriorityQueue::new());
Self {
queues: Arc::new(RwLock::new(queues)),
circuit_breakers: Arc::new(RwLock::new(HashMap::new())),
config: config.clone(),
}
}
pub async fn schedule(&self, task: ResonanceTask) -> ScheduleHandle {
let circuit_breakers = self.circuit_breakers.read().await;
if let Some(breaker) = circuit_breakers.get(&task.target) {
if breaker.is_open() {
tracing::warn!("Circuit breaker open for {}", task.target);
return ScheduleHandle::rejected(RejectionReason::CircuitOpen);
}
}
drop(circuit_breakers);
let attention_class = self.classify_attention(&task);
if !self.has_attention_for(&task) {
tracing::debug!("Attention unavailable for task {}", task.id);
return ScheduleHandle::deferred(DeferralReason::AttentionUnavailable);
}
let mut queues = self.queues.write().await;
let queue = queues.get_mut(&attention_class).unwrap();
if queue.is_full(self.config.max_queue_size) {
tracing::warn!("Queue full for attention class {:?}", attention_class);
return ScheduleHandle::rejected(RejectionReason::QueueFull);
}
queue.push(task.clone());
tracing::debug!(
"Scheduled task {} for {} (class: {:?})",
task.id,
task.target,
attention_class
);
ScheduleHandle::scheduled(task.id)
}
fn classify_attention(&self, task: &ResonanceTask) -> AttentionClass {
task.attention_class
}
fn has_attention_for(&self, _task: &ResonanceTask) -> bool {
true
}
pub async fn trip_circuit_breaker(&self, resonator: ResonatorId) {
let mut breakers = self.circuit_breakers.write().await;
let breaker = breakers
.entry(resonator)
.or_insert_with(|| CircuitBreaker::new(self.config.circuit_breaker_threshold));
breaker.trip();
tracing::warn!("Circuit breaker tripped for {}", resonator);
}
pub async fn reset_circuit_breaker(&self, resonator: &ResonatorId) {
let mut breakers = self.circuit_breakers.write().await;
if let Some(breaker) = breakers.get_mut(resonator) {
breaker.reset();
tracing::info!("Circuit breaker reset for {}", resonator);
}
}
pub async fn next_task(&self) -> Option<ResonanceTask> {
let mut queues = self.queues.write().await;
for class in &[
AttentionClass::Critical,
AttentionClass::High,
AttentionClass::Normal,
AttentionClass::Low,
] {
if let Some(queue) = queues.get_mut(class) {
if let Some(task) = queue.pop() {
return Some(task);
}
}
}
None
}
}
#[derive(Debug, Clone)]
pub struct ResonanceTask {
pub id: TaskId,
pub target: ResonatorId,
pub attention_class: AttentionClass,
pub priority: u32,
pub payload: TaskPayload,
}
impl ResonanceTask {
pub fn new(target: ResonatorId, attention_class: AttentionClass, payload: TaskPayload) -> Self {
Self {
id: TaskId::generate(),
target,
attention_class,
priority: 0,
payload,
}
}
}
#[derive(Debug, Clone)]
pub enum TaskPayload {
ProcessCoupling(CouplingId),
FormMeaning,
StabilizeIntent,
ExecuteCommitment(CommitmentId),
}
struct PriorityQueue {
heap: BinaryHeap<PrioritizedTask>,
}
impl PriorityQueue {
fn new() -> Self {
Self {
heap: BinaryHeap::new(),
}
}
fn push(&mut self, task: ResonanceTask) {
self.heap.push(PrioritizedTask(task));
}
fn pop(&mut self) -> Option<ResonanceTask> {
self.heap.pop().map(|pt| pt.0)
}
fn is_full(&self, max_size: usize) -> bool {
self.heap.len() >= max_size
}
}
struct PrioritizedTask(ResonanceTask);
impl Ord for PrioritizedTask {
fn cmp(&self, other: &Self) -> Ordering {
self.0.priority.cmp(&other.0.priority)
}
}
impl PartialOrd for PrioritizedTask {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Eq for PrioritizedTask {}
impl PartialEq for PrioritizedTask {
fn eq(&self, other: &Self) -> bool {
self.0.priority == other.0.priority
}
}
struct CircuitBreaker {
state: CircuitBreakerState,
failure_threshold: u32,
failure_count: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(dead_code)]
enum CircuitBreakerState {
Closed,
Open,
HalfOpen,
}
impl CircuitBreaker {
fn new(failure_threshold: u32) -> Self {
Self {
state: CircuitBreakerState::Closed,
failure_threshold,
failure_count: 0,
}
}
fn is_open(&self) -> bool {
self.state == CircuitBreakerState::Open
}
fn trip(&mut self) {
self.failure_count += 1;
if self.failure_count >= self.failure_threshold {
self.state = CircuitBreakerState::Open;
}
}
fn reset(&mut self) {
self.state = CircuitBreakerState::Closed;
self.failure_count = 0;
}
}