use super::metrics::{Metrics, MetricsSnapshot};
use crate::scheduler::adaptive::AdaptiveScheduler;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::RwLock;
#[derive(Debug, Clone)]
pub struct FeedbackConfig {
pub min_task_rate: f64,
pub max_latency_ns: u64,
pub max_power_watts: Option<f64>,
pub update_interval: Duration,
pub history_size: usize,
}
impl Default for FeedbackConfig {
fn default() -> Self {
Self {
min_task_rate: 100.0,
max_latency_ns: 10_000_000, max_power_watts: None,
update_interval: Duration::from_millis(100),
history_size: 100,
}
}
}
pub struct FeedbackController {
metrics: Arc<Metrics>,
config: FeedbackConfig,
history: RwLock<VecDeque<MetricsSnapshot>>,
last_update: RwLock<Instant>,
}
impl FeedbackController {
pub fn new(metrics: Arc<Metrics>, config: FeedbackConfig) -> Self {
let history_size = config.history_size;
Self {
metrics,
config,
history: RwLock::new(VecDeque::with_capacity(history_size)),
last_update: RwLock::new(Instant::now()),
}
}
pub fn update(&self) -> FeedbackAction {
let now = Instant::now();
let elapsed = {
let last = *self.last_update.read();
now.duration_since(last)
};
if elapsed < self.config.update_interval {
return FeedbackAction::None;
}
let snapshot = self.metrics.snapshot();
{
let mut history = self.history.write();
history.push_back(snapshot.clone());
if history.len() > self.config.history_size {
history.pop_front();
}
}
*self.last_update.write() = now;
self.analyze(&snapshot)
}
fn analyze(&self, current: &MetricsSnapshot) -> FeedbackAction {
if current.p99_latency_ns > self.config.max_latency_ns {
return FeedbackAction::ReduceLoad {
reason: "High latency detected".to_string(),
};
}
let task_rate = current.tasks_per_second();
if task_rate < self.config.min_task_rate && current.tasks_executed > 100 {
return FeedbackAction::IncreaseParallelism {
reason: "Low throughput detected".to_string(),
};
}
let utilization = current.utilization();
if utilization < 0.3 && current.tasks_executed > 100 {
return FeedbackAction::OptimizeResources {
reason: "Low utilization detected".to_string(),
};
}
FeedbackAction::None
}
pub fn compute_delta(&self) -> Option<MetricsDelta> {
let history = self.history.read();
if history.len() < 2 {
return None;
}
let current = history.back()?;
let previous = history.get(history.len().saturating_sub(10))?;
let time_delta = current.timestamp.duration_since(previous.timestamp).as_secs_f64();
if time_delta == 0.0 {
return None;
}
Some(MetricsDelta {
task_rate_change: ((current.tasks_executed - previous.tasks_executed) as f64 / time_delta)
- previous.tasks_per_second(),
latency_p99_change: current.p99_latency_ns as i64 - previous.p99_latency_ns as i64,
utilization_change: current.utilization() - previous.utilization(),
})
}
pub fn trends(&self) -> Trends {
let delta = self.compute_delta();
Trends {
task_rate_trend: delta.as_ref().map(|d| classify_trend(d.task_rate_change)),
latency_trend: delta.as_ref().map(|d| classify_trend(d.latency_p99_change as f64)),
utilization_trend: delta.as_ref().map(|d| classify_trend(d.utilization_change)),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum FeedbackAction {
None,
IncreaseParallelism { reason: String },
ReduceLoad { reason: String },
OptimizeResources { reason: String },
}
#[derive(Debug, Clone)]
pub struct MetricsDelta {
pub task_rate_change: f64,
pub latency_p99_change: i64,
pub utilization_change: f64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Trend {
Increasing,
Stable,
Decreasing,
}
#[derive(Debug, Clone)]
pub struct Trends {
pub task_rate_trend: Option<Trend>,
pub latency_trend: Option<Trend>,
pub utilization_trend: Option<Trend>,
}
fn classify_trend(value: f64) -> Trend {
const THRESHOLD: f64 = 0.05;
if value > THRESHOLD {
Trend::Increasing
} else if value < -THRESHOLD {
Trend::Decreasing
} else {
Trend::Stable
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_feedback_controller() {
let metrics = Arc::new(Metrics::new());
let config = FeedbackConfig::default();
let controller = FeedbackController::new(metrics, config);
let action = controller.update();
assert_eq!(action, FeedbackAction::None);
}
#[test]
fn test_trend_classification() {
assert_eq!(classify_trend(0.1), Trend::Increasing);
assert_eq!(classify_trend(-0.1), Trend::Decreasing);
assert_eq!(classify_trend(0.01), Trend::Stable);
}
}