use crate::{Result, Error};
use super::{StorageEngine, IncrementalRefresher};
use super::mv_incremental::DeltaTracker;
use serde::{Serialize, Deserialize};
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashSet};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use parking_lot::Mutex;
use tracing::{debug, info, warn, error};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum Priority {
Low = 0,
Normal = 1,
High = 2,
Critical = 3,
}
impl Default for Priority {
fn default() -> Self {
Priority::Normal
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RefreshTask {
pub mv_name: String,
pub priority: Priority,
pub scheduled_time: SystemTime,
pub estimated_duration: Duration,
}
impl Ord for RefreshTask {
fn cmp(&self, other: &Self) -> Ordering {
self.priority.cmp(&other.priority)
.then_with(|| {
other.scheduled_time.cmp(&self.scheduled_time)
})
}
}
impl PartialOrd for RefreshTask {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchedulerConfig {
pub max_cpu_percent: f64,
pub check_interval_secs: u64,
pub batch_size: usize,
pub max_concurrent: usize,
pub adaptive_batch_sizing: bool,
pub auto_retry: bool,
pub max_retries: usize,
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
max_cpu_percent: 70.0,
check_interval_secs: 5,
batch_size: 10,
max_concurrent: 4,
adaptive_batch_sizing: true,
auto_retry: true,
max_retries: 3,
}
}
}
impl SchedulerConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_max_cpu_percent(mut self, percent: f64) -> Self {
self.max_cpu_percent = percent.clamp(0.0, 100.0);
self
}
pub fn with_check_interval(mut self, seconds: u64) -> Self {
self.check_interval_secs = seconds.max(1);
self
}
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = size.max(1);
self
}
pub fn with_max_concurrent(mut self, max: usize) -> Self {
self.max_concurrent = max.max(1);
self
}
pub fn with_adaptive_batch_sizing(mut self, enabled: bool) -> Self {
self.adaptive_batch_sizing = enabled;
self
}
pub fn with_auto_retry(mut self, enabled: bool) -> Self {
self.auto_retry = enabled;
self
}
}
pub struct CpuMonitor {
last_cpu_usage: Arc<Mutex<f64>>,
system: Arc<Mutex<sysinfo::System>>,
}
impl CpuMonitor {
pub fn new() -> Self {
Self {
last_cpu_usage: Arc::new(Mutex::new(0.0)),
system: Arc::new(Mutex::new(sysinfo::System::new())),
}
}
pub fn get_cpu_usage(&self) -> Result<f64> {
let mut system = self.system.lock();
system.refresh_cpu();
std::thread::sleep(Duration::from_millis(100));
system.refresh_cpu();
let cpus = system.cpus();
if cpus.is_empty() {
return Err(Error::storage("No CPU information available"));
}
let total_cpu = cpus.iter()
.map(|cpu| cpu.cpu_usage())
.sum::<f32>() / cpus.len() as f32;
Ok(total_cpu as f64)
}
pub fn get_smoothed_cpu_usage(&self) -> Result<f64> {
let current = self.get_cpu_usage()?;
let mut last = self.last_cpu_usage.lock();
let smoothed = if *last == 0.0 {
current
} else {
0.7 * (*last) + 0.3 * current
};
*last = smoothed;
debug!("CPU usage: raw={:.1}%, smoothed={:.1}%", current, smoothed);
Ok(smoothed)
}
}
impl Default for CpuMonitor {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
struct RefreshResult {
strategy_used: String,
rows_affected: u64,
duration: Duration,
}
pub struct MVScheduler {
config: Arc<Mutex<SchedulerConfig>>,
storage: Arc<StorageEngine>,
delta_tracker: Arc<DeltaTracker>,
incremental_refresher: Arc<IncrementalRefresher>,
refresh_queue: Arc<Mutex<BinaryHeap<RefreshTask>>>,
running_tasks: Arc<Mutex<HashSet<String>>>,
cpu_monitor: Arc<CpuMonitor>,
retry_counts: Arc<Mutex<std::collections::HashMap<String, usize>>>,
}
impl MVScheduler {
pub fn new(
config: SchedulerConfig,
storage: Arc<StorageEngine>,
) -> Self {
info!("Initializing MVScheduler with config: max_cpu={}%, check_interval={}s, max_concurrent={}",
config.max_cpu_percent, config.check_interval_secs, config.max_concurrent);
let delta_tracker = Arc::new(DeltaTracker::new(Arc::clone(&storage)));
let incremental_refresher = Arc::new(IncrementalRefresher::new(
Arc::clone(&storage),
Arc::clone(&delta_tracker),
));
Self {
config: Arc::new(Mutex::new(config)),
storage,
delta_tracker,
incremental_refresher,
refresh_queue: Arc::new(Mutex::new(BinaryHeap::new())),
running_tasks: Arc::new(Mutex::new(HashSet::new())),
cpu_monitor: Arc::new(CpuMonitor::new()),
retry_counts: Arc::new(Mutex::new(std::collections::HashMap::new())),
}
}
pub fn schedule_refresh(&self, mv_name: &str, priority: Priority) -> Result<()> {
{
let running = self.running_tasks.lock();
if running.contains(mv_name) {
debug!("MV '{}' is already being refreshed, skipping schedule", mv_name);
return Ok(());
}
}
{
let queue = self.refresh_queue.lock();
if queue.iter().any(|task| task.mv_name == mv_name) {
debug!("MV '{}' is already in refresh queue, skipping schedule", mv_name);
return Ok(());
}
}
let estimated_duration = self.estimate_duration(mv_name)?;
let task = RefreshTask {
mv_name: mv_name.to_string(),
priority,
scheduled_time: SystemTime::now(),
estimated_duration,
};
info!("Scheduled MV '{}' for refresh with priority {:?}", mv_name, priority);
self.refresh_queue.lock().push(task);
Ok(())
}
pub async fn run(&self) -> Result<()> {
info!("Starting MVScheduler background loop");
loop {
let check_interval = self.config.lock().check_interval_secs;
tokio::time::sleep(Duration::from_secs(check_interval)).await;
let monitor = self.cpu_monitor.clone();
let cpu_usage = match tokio::task::spawn_blocking(move || {
monitor.get_smoothed_cpu_usage()
}).await {
Ok(Ok(usage)) => usage,
Ok(Err(e)) => {
warn!("Failed to get CPU usage: {}", e);
continue;
}
Err(e) => {
warn!("CPU monitor task panicked: {}", e);
continue;
}
};
let max_cpu = self.config.lock().max_cpu_percent;
if cpu_usage > max_cpu {
debug!("CPU usage {:.1}% exceeds threshold {:.1}%, skipping refresh batch",
cpu_usage, max_cpu);
continue;
}
if self.config.lock().adaptive_batch_sizing {
self.adjust_batch_size(cpu_usage);
}
let max_concurrent = self.config.lock().max_concurrent;
let available_capacity = max_concurrent.saturating_sub(self.running_tasks.lock().len());
if available_capacity == 0 {
debug!("No available capacity for new refresh tasks");
continue;
}
let batch_size = self.config.lock().batch_size;
let mut tasks_to_run = Vec::new();
{
let mut queue = self.refresh_queue.lock();
for _ in 0..available_capacity.min(batch_size) {
if let Some(task) = queue.pop() {
tasks_to_run.push(task);
} else {
break;
}
}
}
if tasks_to_run.is_empty() {
debug!("No pending refresh tasks in queue");
continue;
}
info!("Processing {} refresh tasks from queue", tasks_to_run.len());
for task in tasks_to_run {
let scheduler = self.clone();
let handle = tokio::spawn(async move {
if let Err(e) = scheduler.execute_refresh(task).await {
error!("Failed to execute refresh task: {}", e);
}
});
tokio::spawn(async move {
if let Err(e) = handle.await {
if e.is_panic() {
error!("MV refresh task panicked: {}", e);
}
}
});
}
}
}
async fn execute_refresh(&self, task: RefreshTask) -> Result<()> {
let mv_name = task.mv_name.clone();
self.running_tasks.lock().insert(mv_name.clone());
info!("Starting refresh for MV '{}' (priority: {:?})", mv_name, task.priority);
let start = SystemTime::now();
let result = self.perform_refresh(&mv_name).await;
let duration = start.elapsed()
.unwrap_or(Duration::from_secs(0));
self.running_tasks.lock().remove(&mv_name);
match result {
Ok(refresh_result) => {
info!("Refreshed MV '{}' in {:?} using {} strategy, {} rows affected",
mv_name, duration, refresh_result.strategy_used, refresh_result.rows_affected);
self.retry_counts.lock().remove(&mv_name);
Ok(())
}
Err(e) => {
error!("Failed to refresh MV '{}': {}", mv_name, e);
let should_retry = self.config.lock().auto_retry;
if should_retry {
let mut retry_counts = self.retry_counts.lock();
let retry_count = retry_counts.entry(mv_name.clone()).or_insert(0);
*retry_count += 1;
let max_retries = self.config.lock().max_retries;
if *retry_count <= max_retries {
warn!("Rescheduling MV '{}' for retry (attempt {} of {})",
mv_name, retry_count, max_retries);
let new_priority = match task.priority {
Priority::Critical => Priority::High,
Priority::High => Priority::Normal,
Priority::Normal => Priority::Low,
Priority::Low => Priority::Low,
};
drop(retry_counts);
self.schedule_refresh(&mv_name, new_priority)?;
} else {
error!("MV '{}' exceeded maximum retry attempts ({}), giving up",
mv_name, max_retries);
retry_counts.remove(&mv_name);
}
}
Err(e)
}
}
}
async fn perform_refresh(&self, mv_name: &str) -> Result<RefreshResult> {
use super::MaterializedViewCatalog;
let catalog = MaterializedViewCatalog::new(&self.storage);
let mut metadata = catalog.get_view(mv_name)?;
let cost = self.incremental_refresher.estimate_refresh_cost(&metadata)?;
debug!("Refresh cost for '{}': incremental={:.2}s, full={:.2}s, strategy={:?}",
mv_name, cost.incremental_cost, cost.full_cost, cost.recommendation);
let refresh_result = self.incremental_refresher.refresh_incremental(mv_name)?;
let rows_affected = refresh_result.rows_inserted
+ refresh_result.rows_updated
+ refresh_result.rows_deleted;
metadata.mark_refreshed(rows_affected as u64);
catalog.update_view(&metadata)?;
Ok(RefreshResult {
strategy_used: format!("{:?}", refresh_result.strategy_used),
rows_affected: rows_affected as u64,
duration: refresh_result.duration,
})
}
fn estimate_duration(&self, mv_name: &str) -> Result<Duration> {
use super::MaterializedViewCatalog;
let catalog = MaterializedViewCatalog::new(&self.storage);
match catalog.get_view(mv_name) {
Ok(metadata) => {
let row_count = metadata.row_count.unwrap_or(1000);
let estimated_ms = (row_count / 1000).max(1) * 10;
Ok(Duration::from_millis(estimated_ms))
}
Err(_) => {
Ok(Duration::from_secs(5))
}
}
}
fn adjust_batch_size(&self, cpu_usage: f64) {
let mut config = self.config.lock();
let old_batch_size = config.batch_size;
if cpu_usage < 50.0 {
config.batch_size = (config.batch_size + 5).min(50);
} else if cpu_usage > 80.0 {
config.batch_size = config.batch_size.saturating_sub(5).max(1);
}
if config.batch_size != old_batch_size {
debug!("Adjusted batch size from {} to {} (CPU: {:.1}%)",
old_batch_size, config.batch_size, cpu_usage);
}
}
pub fn on_base_table_change(&self, table_name: &str) -> Result<()> {
use super::MaterializedViewCatalog;
debug!("Base table '{}' changed, checking dependent MVs", table_name);
let catalog = MaterializedViewCatalog::new(&self.storage);
let all_mvs = catalog.list_views()?;
let mut affected_count = 0;
for mv_name in all_mvs {
let metadata = catalog.get_view(&mv_name)?;
if metadata.base_tables.contains(&table_name.to_string()) {
self.schedule_refresh(&mv_name, Priority::Normal)?;
affected_count += 1;
}
}
if affected_count > 0 {
info!("Scheduled {} dependent MVs for refresh after table '{}' change",
affected_count, table_name);
}
Ok(())
}
pub fn get_stats(&self) -> SchedulerStats {
SchedulerStats {
queue_size: self.refresh_queue.lock().len(),
running_tasks: self.running_tasks.lock().len(),
cpu_usage: *self.cpu_monitor.last_cpu_usage.lock(),
}
}
}
impl Clone for MVScheduler {
fn clone(&self) -> Self {
Self {
config: Arc::clone(&self.config),
storage: Arc::clone(&self.storage),
delta_tracker: Arc::clone(&self.delta_tracker),
incremental_refresher: Arc::clone(&self.incremental_refresher),
refresh_queue: Arc::clone(&self.refresh_queue),
running_tasks: Arc::clone(&self.running_tasks),
cpu_monitor: Arc::clone(&self.cpu_monitor),
retry_counts: Arc::clone(&self.retry_counts),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchedulerStats {
pub queue_size: usize,
pub running_tasks: usize,
pub cpu_usage: f64,
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::{Config, Schema, Column, DataType};
#[test]
fn test_priority_ordering() {
assert!(Priority::Critical > Priority::High);
assert!(Priority::High > Priority::Normal);
assert!(Priority::Normal > Priority::Low);
}
#[test]
fn test_task_priority_queue() {
let mut queue = BinaryHeap::new();
queue.push(RefreshTask {
mv_name: "low_task".to_string(),
priority: Priority::Low,
scheduled_time: SystemTime::now(),
estimated_duration: Duration::from_secs(1),
});
queue.push(RefreshTask {
mv_name: "critical_task".to_string(),
priority: Priority::Critical,
scheduled_time: SystemTime::now(),
estimated_duration: Duration::from_secs(1),
});
queue.push(RefreshTask {
mv_name: "normal_task".to_string(),
priority: Priority::Normal,
scheduled_time: SystemTime::now(),
estimated_duration: Duration::from_secs(1),
});
let task1 = queue.pop().unwrap();
assert_eq!(task1.priority, Priority::Critical);
let task2 = queue.pop().unwrap();
assert_eq!(task2.priority, Priority::Normal);
let task3 = queue.pop().unwrap();
assert_eq!(task3.priority, Priority::Low);
}
#[test]
fn test_cpu_monitor_creation() {
let monitor = CpuMonitor::new();
let usage = monitor.get_cpu_usage();
assert!(usage.is_ok());
let cpu = usage.unwrap();
assert!(cpu >= 0.0 && cpu <= 100.0);
}
#[test]
fn test_scheduler_config() {
let config = SchedulerConfig::default()
.with_max_cpu_percent(80.0)
.with_check_interval(10)
.with_batch_size(20)
.with_max_concurrent(8);
assert_eq!(config.max_cpu_percent, 80.0);
assert_eq!(config.check_interval_secs, 10);
assert_eq!(config.batch_size, 20);
assert_eq!(config.max_concurrent, 8);
}
#[test]
fn test_scheduler_creation() {
let config = Config::in_memory();
let storage = Arc::new(StorageEngine::open_in_memory(&config).unwrap());
let scheduler_config = SchedulerConfig::default();
let scheduler = MVScheduler::new(scheduler_config, storage);
let stats = scheduler.get_stats();
assert_eq!(stats.queue_size, 0);
assert_eq!(stats.running_tasks, 0);
}
#[test]
fn test_schedule_refresh() {
let config = Config::in_memory();
let storage = Arc::new(StorageEngine::open_in_memory(&config).unwrap());
use super::super::MaterializedViewCatalog;
use crate::sql::LogicalPlan;
let mv_catalog = MaterializedViewCatalog::new(&storage);
let schema = Schema::new(vec![
Column::new("count", DataType::Int8),
]);
let query_plan = LogicalPlan::Scan {
alias: None,
table_name: "test".to_string(),
schema: std::sync::Arc::new(schema.clone()),
projection: None,
as_of: None,
};
let query_plan_bytes = bincode::serialize(&query_plan).unwrap();
let metadata = super::super::MaterializedViewMetadata::new(
"test_mv".to_string(),
"SELECT COUNT(*) FROM test".to_string(),
query_plan_bytes,
vec!["test".to_string()],
schema,
);
mv_catalog.create_view(metadata).unwrap();
let scheduler_config = SchedulerConfig::default();
let scheduler = MVScheduler::new(scheduler_config, storage);
let result = scheduler.schedule_refresh("test_mv", Priority::High);
assert!(result.is_ok());
let stats = scheduler.get_stats();
assert_eq!(stats.queue_size, 1);
}
#[test]
fn test_duplicate_scheduling_prevention() {
let config = Config::in_memory();
let storage = Arc::new(StorageEngine::open_in_memory(&config).unwrap());
use super::super::MaterializedViewCatalog;
use crate::sql::LogicalPlan;
let mv_catalog = MaterializedViewCatalog::new(&storage);
let schema = Schema::new(vec![Column::new("id", DataType::Int4)]);
let query_plan = LogicalPlan::Scan {
alias: None,
table_name: "test".to_string(),
schema: std::sync::Arc::new(schema.clone()),
projection: None,
as_of: None,
};
let query_plan_bytes = bincode::serialize(&query_plan).unwrap();
let metadata = super::super::MaterializedViewMetadata::new(
"test_mv".to_string(),
"SELECT * FROM test".to_string(),
query_plan_bytes,
vec!["test".to_string()],
schema,
);
mv_catalog.create_view(metadata).unwrap();
let scheduler_config = SchedulerConfig::default();
let scheduler = MVScheduler::new(scheduler_config, storage);
scheduler.schedule_refresh("test_mv", Priority::High).unwrap();
scheduler.schedule_refresh("test_mv", Priority::High).unwrap();
let stats = scheduler.get_stats();
assert_eq!(stats.queue_size, 1);
}
}