use crate::{Result, Error};
use super::{StorageEngine, MaterializedViewCatalog, mv_scheduler::{MVScheduler, Priority}};
use serde::{Serialize, Deserialize};
use std::sync::Arc;
use std::time::Duration;
use std::collections::VecDeque;
use parking_lot::{Mutex, RwLock};
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing::{info, debug, warn, error};
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RefreshHistoryEntry {
pub mv_name: String,
pub start_time: DateTime<Utc>,
pub end_time: DateTime<Utc>,
pub success: bool,
pub error_message: Option<String>,
pub rows_affected: Option<i64>,
pub strategy: String,
pub trigger: String,
}
const MAX_HISTORY_ENTRIES: usize = 1000;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AutoRefreshConfig {
pub enabled: bool,
pub interval_seconds: u64,
pub staleness_threshold_seconds: i64,
pub max_concurrent_refreshes: usize,
pub max_cpu_percent: f64,
}
impl Default for AutoRefreshConfig {
fn default() -> Self {
Self {
enabled: false,
interval_seconds: 60,
staleness_threshold_seconds: 300, max_concurrent_refreshes: 2,
max_cpu_percent: 50.0,
}
}
}
impl AutoRefreshConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_enabled(mut self, enabled: bool) -> Self {
self.enabled = enabled;
self
}
pub fn with_interval_seconds(mut self, seconds: u64) -> Self {
self.interval_seconds = seconds.max(1);
self
}
pub fn with_staleness_threshold(mut self, seconds: i64) -> Self {
self.staleness_threshold_seconds = seconds.max(0);
self
}
pub fn with_max_concurrent(mut self, max: usize) -> Self {
self.max_concurrent_refreshes = max.max(1);
self
}
pub fn with_max_cpu_percent(mut self, percent: f64) -> Self {
self.max_cpu_percent = percent.clamp(0.0, 100.0);
self
}
}
#[derive(Debug)]
enum WorkerCommand {
Stop,
CheckNow,
}
pub struct AutoRefreshWorker {
config: Arc<RwLock<AutoRefreshConfig>>,
storage: Arc<StorageEngine>,
scheduler: Arc<MVScheduler>,
command_tx: Option<mpsc::UnboundedSender<WorkerCommand>>,
worker_handle: Option<JoinHandle<()>>,
is_running: Arc<Mutex<bool>>,
active_refreshes: Arc<Mutex<usize>>,
refresh_history: Arc<Mutex<VecDeque<RefreshHistoryEntry>>>,
}
impl AutoRefreshWorker {
pub fn new(
config: AutoRefreshConfig,
storage: Arc<StorageEngine>,
scheduler: Arc<MVScheduler>,
) -> Self {
info!("Creating AutoRefreshWorker with config: enabled={}, interval={}s, staleness_threshold={}s",
config.enabled, config.interval_seconds, config.staleness_threshold_seconds);
Self {
config: Arc::new(RwLock::new(config)),
storage,
scheduler,
command_tx: None,
worker_handle: None,
is_running: Arc::new(Mutex::new(false)),
active_refreshes: Arc::new(Mutex::new(0)),
refresh_history: Arc::new(Mutex::new(VecDeque::with_capacity(MAX_HISTORY_ENTRIES))),
}
}
pub async fn start(&mut self) -> Result<()> {
let config = self.config.read().clone();
if !config.enabled {
info!("AutoRefreshWorker is disabled, not starting");
return Ok(());
}
{
let mut running = self.is_running.lock();
if *running {
return Err(Error::storage("AutoRefreshWorker is already running"));
}
*running = true;
}
info!("Starting AutoRefreshWorker background task");
let (tx, rx) = mpsc::unbounded_channel();
self.command_tx = Some(tx);
let config = Arc::clone(&self.config);
let storage = Arc::clone(&self.storage);
let scheduler = Arc::clone(&self.scheduler);
let is_running = Arc::clone(&self.is_running);
let active_refreshes = Arc::clone(&self.active_refreshes);
let handle = tokio::spawn(async move {
Self::worker_loop(config, storage, scheduler, is_running, active_refreshes, rx).await;
});
self.worker_handle = Some(handle);
info!("AutoRefreshWorker started successfully");
Ok(())
}
pub fn request_stop(&self) {
if let Some(tx) = &self.command_tx {
let _ = tx.send(WorkerCommand::Stop);
}
*self.is_running.lock() = false;
}
pub async fn stop(&mut self) -> Result<()> {
info!("Stopping AutoRefreshWorker");
if let Some(tx) = &self.command_tx {
let _ = tx.send(WorkerCommand::Stop);
}
if let Some(handle) = self.worker_handle.take() {
match tokio::time::timeout(Duration::from_secs(30), handle).await {
Ok(result) => {
if let Err(e) = result {
error!("Worker task panicked: {}", e);
return Err(Error::storage(format!("Worker task panicked: {}", e)));
}
}
Err(_) => {
warn!("Worker task did not stop within timeout, forcing shutdown");
}
}
}
*self.is_running.lock() = false;
self.command_tx = None;
info!("AutoRefreshWorker stopped");
Ok(())
}
pub fn is_running(&self) -> bool {
*self.is_running.lock()
}
pub fn config(&self) -> AutoRefreshConfig {
self.config.read().clone()
}
pub fn update_config(&self, config: AutoRefreshConfig) {
*self.config.write() = config;
info!("AutoRefreshWorker configuration updated");
}
pub fn check_now(&self) -> Result<()> {
if let Some(tx) = &self.command_tx {
tx.send(WorkerCommand::CheckNow)
.map_err(|e| Error::storage(format!("Failed to send check command: {}", e)))?;
Ok(())
} else {
Err(Error::storage("Worker is not running"))
}
}
pub fn active_refresh_count(&self) -> usize {
*self.active_refreshes.lock()
}
pub fn record_refresh(
&self,
mv_name: String,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
success: bool,
error_message: Option<String>,
rows_affected: Option<i64>,
strategy: String,
trigger: String,
) {
let entry = RefreshHistoryEntry {
mv_name,
start_time,
end_time,
success,
error_message,
rows_affected,
strategy,
trigger,
};
let mut history = self.refresh_history.lock();
history.push_front(entry);
while history.len() > MAX_HISTORY_ENTRIES {
history.pop_back();
}
}
pub fn get_refresh_history(&self, limit: Option<usize>) -> Vec<RefreshHistoryEntry> {
let history = self.refresh_history.lock();
let limit = limit.unwrap_or(history.len());
history.iter().take(limit).cloned().collect()
}
pub fn clear_history(&self) {
self.refresh_history.lock().clear();
}
pub fn history_count(&self) -> usize {
self.refresh_history.lock().len()
}
async fn worker_loop(
config: Arc<RwLock<AutoRefreshConfig>>,
storage: Arc<StorageEngine>,
scheduler: Arc<MVScheduler>,
is_running: Arc<Mutex<bool>>,
active_refreshes: Arc<Mutex<usize>>,
mut command_rx: mpsc::UnboundedReceiver<WorkerCommand>,
) {
info!("AutoRefreshWorker loop started");
loop {
let interval_seconds = config.read().interval_seconds;
tokio::select! {
() = tokio::time::sleep(Duration::from_secs(interval_seconds)) => {
Self::perform_staleness_check(
&config,
&storage,
&scheduler,
&active_refreshes,
).await;
}
Some(cmd) = command_rx.recv() => {
match cmd {
WorkerCommand::Stop => {
info!("Received stop command, shutting down worker");
break;
}
WorkerCommand::CheckNow => {
debug!("Received immediate check command");
Self::perform_staleness_check(
&config,
&storage,
&scheduler,
&active_refreshes,
).await;
}
}
}
}
}
info!("Waiting for active refreshes to complete");
let mut wait_count = 0;
while *active_refreshes.lock() > 0 && wait_count < 30 {
tokio::time::sleep(Duration::from_secs(1)).await;
wait_count += 1;
}
*is_running.lock() = false;
info!("AutoRefreshWorker loop terminated");
}
async fn perform_staleness_check(
config: &Arc<RwLock<AutoRefreshConfig>>,
storage: &Arc<StorageEngine>,
scheduler: &Arc<MVScheduler>,
active_refreshes: &Arc<Mutex<usize>>,
) {
let cfg = config.read().clone();
debug!("Performing staleness check");
let cpu_stats = scheduler.get_stats();
if cpu_stats.cpu_usage > cfg.max_cpu_percent {
debug!(
"CPU usage {:.1}% exceeds threshold {:.1}%, skipping refresh check",
cpu_stats.cpu_usage, cfg.max_cpu_percent
);
return;
}
let active_count = *active_refreshes.lock();
if active_count >= cfg.max_concurrent_refreshes {
debug!(
"Active refresh count {} meets limit {}, skipping",
active_count, cfg.max_concurrent_refreshes
);
return;
}
let catalog = MaterializedViewCatalog::new(storage.as_ref());
let views = match catalog.list_views() {
Ok(v) => v,
Err(e) => {
error!("Failed to list materialized views: {}", e);
return;
}
};
debug!("Checking {} materialized views for staleness", views.len());
let mut stale_views = Vec::new();
for view_name in views {
let metadata = match catalog.get_view(&view_name) {
Ok(m) => m,
Err(e) => {
warn!("Failed to get metadata for view '{}': {}", view_name, e);
continue;
}
};
let auto_refresh_enabled = metadata.metadata
.get("auto_refresh")
.and_then(|v| v.parse::<bool>().ok())
.unwrap_or(false);
if !auto_refresh_enabled {
continue;
}
if let Some(staleness) = metadata.staleness_seconds() {
if staleness >= cfg.staleness_threshold_seconds {
debug!(
"View '{}' is stale ({} seconds old, threshold: {})",
view_name, staleness, cfg.staleness_threshold_seconds
);
stale_views.push((view_name.clone(), staleness));
}
} else if metadata.is_stale() {
debug!("View '{}' has never been refreshed", view_name);
stale_views.push((view_name.clone(), i64::MAX));
}
}
if stale_views.is_empty() {
debug!("No stale views found");
return;
}
stale_views.sort_by(|a, b| b.1.cmp(&a.1));
let available_slots = cfg.max_concurrent_refreshes.saturating_sub(active_count);
let to_refresh = stale_views.iter().take(available_slots);
for (view_name, staleness) in to_refresh {
info!(
"Scheduling auto-refresh for view '{}' (staleness: {} seconds)",
view_name, staleness
);
*active_refreshes.lock() += 1;
if let Err(e) = scheduler.schedule_refresh(view_name, Priority::Normal) {
error!("Failed to schedule refresh for '{}': {}", view_name, e);
*active_refreshes.lock() = active_refreshes.lock().saturating_sub(1);
}
}
tokio::spawn({
let active_refreshes = Arc::clone(active_refreshes);
async move {
tokio::time::sleep(Duration::from_secs(5)).await;
*active_refreshes.lock() = 0;
}
});
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::{Config, Column, DataType, Schema};
use crate::sql::LogicalPlan;
use crate::storage::mv_scheduler::SchedulerConfig;
fn create_test_storage() -> Arc<StorageEngine> {
let config = Config::in_memory();
Arc::new(StorageEngine::open_in_memory(&config).unwrap())
}
fn create_test_scheduler(storage: Arc<StorageEngine>) -> Arc<MVScheduler> {
let config = SchedulerConfig::default();
Arc::new(MVScheduler::new(config, storage))
}
#[test]
fn test_auto_refresh_config_default() {
let config = AutoRefreshConfig::default();
assert!(!config.enabled);
assert_eq!(config.interval_seconds, 60);
assert_eq!(config.staleness_threshold_seconds, 300);
assert_eq!(config.max_concurrent_refreshes, 2);
assert_eq!(config.max_cpu_percent, 50.0);
}
#[test]
fn test_auto_refresh_config_builder() {
let config = AutoRefreshConfig::new()
.with_enabled(true)
.with_interval_seconds(30)
.with_staleness_threshold(600)
.with_max_concurrent(4)
.with_max_cpu_percent(75.0);
assert!(config.enabled);
assert_eq!(config.interval_seconds, 30);
assert_eq!(config.staleness_threshold_seconds, 600);
assert_eq!(config.max_concurrent_refreshes, 4);
assert_eq!(config.max_cpu_percent, 75.0);
}
#[test]
fn test_config_validation() {
let config = AutoRefreshConfig::new().with_interval_seconds(0);
assert_eq!(config.interval_seconds, 1);
let config = AutoRefreshConfig::new().with_staleness_threshold(-100);
assert_eq!(config.staleness_threshold_seconds, 0);
let config = AutoRefreshConfig::new().with_max_concurrent(0);
assert_eq!(config.max_concurrent_refreshes, 1);
let config = AutoRefreshConfig::new().with_max_cpu_percent(150.0);
assert_eq!(config.max_cpu_percent, 100.0);
let config = AutoRefreshConfig::new().with_max_cpu_percent(-10.0);
assert_eq!(config.max_cpu_percent, 0.0);
}
#[test]
fn test_auto_refresh_worker_creation() {
let storage = create_test_storage();
let scheduler = create_test_scheduler(Arc::clone(&storage));
let config = AutoRefreshConfig::default();
let worker = AutoRefreshWorker::new(config, storage, scheduler);
assert!(!worker.is_running());
assert!(!worker.config().enabled);
}
#[tokio::test]
async fn test_worker_start_disabled() {
let storage = create_test_storage();
let scheduler = create_test_scheduler(Arc::clone(&storage));
let config = AutoRefreshConfig::default();
let mut worker = AutoRefreshWorker::new(config, storage, scheduler);
assert!(worker.start().await.is_ok());
assert!(!worker.is_running());
}
#[tokio::test]
async fn test_worker_start_enabled() {
let storage = create_test_storage();
let scheduler = create_test_scheduler(Arc::clone(&storage));
let config = AutoRefreshConfig::new().with_enabled(true);
let mut worker = AutoRefreshWorker::new(config, storage, scheduler);
assert!(worker.start().await.is_ok());
assert!(worker.is_running());
assert!(worker.start().await.is_err());
assert!(worker.stop().await.is_ok());
assert!(!worker.is_running());
}
#[tokio::test]
async fn test_worker_stop_graceful() {
let storage = create_test_storage();
let scheduler = create_test_scheduler(Arc::clone(&storage));
let config = AutoRefreshConfig::new()
.with_enabled(true)
.with_interval_seconds(1);
let mut worker = AutoRefreshWorker::new(config, storage, scheduler);
worker.start().await.unwrap();
assert!(worker.is_running());
tokio::time::sleep(Duration::from_millis(100)).await;
let stop_result = worker.stop().await;
assert!(stop_result.is_ok());
assert!(!worker.is_running());
}
#[tokio::test]
async fn test_worker_check_now() {
let storage = create_test_storage();
let scheduler = create_test_scheduler(Arc::clone(&storage));
let config = AutoRefreshConfig::new()
.with_enabled(true)
.with_interval_seconds(60);
let mut worker = AutoRefreshWorker::new(config, storage, scheduler);
assert!(worker.check_now().is_err());
worker.start().await.unwrap();
assert!(worker.check_now().is_ok());
worker.stop().await.unwrap();
}
#[tokio::test]
async fn test_worker_update_config() {
let storage = create_test_storage();
let scheduler = create_test_scheduler(Arc::clone(&storage));
let config = AutoRefreshConfig::new()
.with_enabled(true)
.with_staleness_threshold(300);
let mut worker = AutoRefreshWorker::new(config, storage, scheduler);
worker.start().await.unwrap();
let new_config = AutoRefreshConfig::new()
.with_enabled(true)
.with_staleness_threshold(600);
worker.update_config(new_config);
let current_config = worker.config();
assert_eq!(current_config.staleness_threshold_seconds, 600);
worker.stop().await.unwrap();
}
#[tokio::test]
async fn test_active_refresh_count() {
let storage = create_test_storage();
let scheduler = create_test_scheduler(Arc::clone(&storage));
let config = AutoRefreshConfig::new().with_enabled(true);
let worker = AutoRefreshWorker::new(config, storage, scheduler);
assert_eq!(worker.active_refresh_count(), 0);
}
#[tokio::test]
async fn test_staleness_check_with_no_views() {
let storage = create_test_storage();
let scheduler = create_test_scheduler(Arc::clone(&storage));
let config = AutoRefreshConfig::new()
.with_enabled(true)
.with_interval_seconds(1);
let mut worker = AutoRefreshWorker::new(config, storage, scheduler);
worker.start().await.unwrap();
tokio::time::sleep(Duration::from_millis(2100)).await;
assert!(worker.is_running());
worker.stop().await.unwrap();
}
#[tokio::test]
async fn test_cpu_throttling() {
let storage = create_test_storage();
let scheduler = create_test_scheduler(Arc::clone(&storage));
let config = AutoRefreshConfig::new()
.with_enabled(true)
.with_interval_seconds(1)
.with_max_cpu_percent(0.1);
let mut worker = AutoRefreshWorker::new(config, storage, scheduler);
worker.start().await.unwrap();
tokio::time::sleep(Duration::from_millis(1500)).await;
assert!(worker.is_running());
worker.stop().await.unwrap();
}
#[tokio::test]
async fn test_concurrent_refresh_limit() {
let storage = create_test_storage();
let scheduler = create_test_scheduler(Arc::clone(&storage));
let catalog = MaterializedViewCatalog::new(&storage);
let schema = Schema::new(vec![Column::new("id", DataType::Int4)]);
let query_plan = LogicalPlan::Scan {
alias: None,
table_name: "test".to_string(),
schema: std::sync::Arc::new(schema.clone()),
projection: None,
as_of: None,
};
let query_plan_bytes = bincode::serialize(&query_plan).unwrap();
let mut metadata = crate::storage::MaterializedViewMetadata::new(
"test_view".to_string(),
"SELECT * FROM test".to_string(),
query_plan_bytes,
vec!["test".to_string()],
schema,
);
metadata.metadata.insert("auto_refresh".to_string(), "true".to_string());
catalog.create_view(metadata).unwrap();
let config = AutoRefreshConfig::new()
.with_enabled(true)
.with_interval_seconds(1)
.with_max_concurrent(1);
let mut worker = AutoRefreshWorker::new(config, Arc::clone(&storage), scheduler);
worker.start().await.unwrap();
tokio::time::sleep(Duration::from_millis(1500)).await;
assert!(worker.active_refresh_count() <= 1);
worker.stop().await.unwrap();
}
#[test]
fn test_worker_command_enum() {
let stop_cmd = WorkerCommand::Stop;
let check_cmd = WorkerCommand::CheckNow;
assert!(matches!(stop_cmd, WorkerCommand::Stop));
assert!(matches!(check_cmd, WorkerCommand::CheckNow));
}
}