#![allow(unused_variables)]
use crate::{Result, Tuple, Value, Schema, Column, DataType};
use super::{
StorageEngine, MaterializedViewCatalog, AutoRefreshWorker,
MVScheduler, SchedulerStats, mv_auto_refresh::RefreshHistoryEntry,
};
use std::sync::Arc;
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AutoRefreshStatus {
pub mv_name: String,
pub auto_refresh_enabled: bool,
pub last_refresh: Option<chrono::DateTime<chrono::Utc>>,
pub staleness_seconds: Option<i64>,
pub threshold_seconds: i64,
pub is_refreshing: bool,
pub refresh_strategy: String,
pub row_count: Option<u64>,
pub base_table_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CpuUsageInfo {
pub current_cpu_percent: f64,
pub max_cpu_percent: f64,
pub is_throttled: bool,
pub active_tasks: usize,
pub queued_tasks: usize,
}
pub struct MvSystemViews {
storage: Arc<StorageEngine>,
auto_refresh_worker: Option<Arc<AutoRefreshWorker>>,
scheduler: Arc<MVScheduler>,
}
impl MvSystemViews {
pub fn new(
storage: Arc<StorageEngine>,
scheduler: Arc<MVScheduler>,
) -> Self {
Self {
storage,
auto_refresh_worker: None,
scheduler,
}
}
pub fn with_auto_refresh_worker(mut self, worker: Arc<AutoRefreshWorker>) -> Self {
self.auto_refresh_worker = Some(worker);
self
}
pub fn pg_mv_auto_refresh_status(&self) -> Result<Vec<AutoRefreshStatus>> {
let catalog = MaterializedViewCatalog::new(&self.storage);
let all_mvs = catalog.list_views()?;
let worker_config = self.auto_refresh_worker.as_ref()
.map(|w| w.config())
.unwrap_or_default();
let scheduler_stats = self.scheduler.get_stats();
let running_mvs: Vec<String> = vec![];
let mut statuses = Vec::new();
for mv_name in all_mvs {
let metadata = catalog.get_view(&mv_name)?;
let auto_refresh_enabled = metadata.metadata
.get("auto_refresh")
.and_then(|v| v.parse::<bool>().ok())
.unwrap_or(false);
let status = AutoRefreshStatus {
mv_name: mv_name.clone(),
auto_refresh_enabled,
last_refresh: metadata.last_refresh,
staleness_seconds: metadata.staleness_seconds(),
threshold_seconds: worker_config.staleness_threshold_seconds,
is_refreshing: running_mvs.contains(&mv_name),
refresh_strategy: metadata.refresh_strategy.clone(),
row_count: metadata.row_count,
base_table_count: metadata.base_tables.len(),
};
statuses.push(status);
}
Ok(statuses)
}
pub fn pg_mv_refresh_history(&self, limit: Option<usize>) -> Result<Vec<Tuple>> {
if let Some(ref worker) = self.auto_refresh_worker {
let entries = worker.get_refresh_history(limit);
let tuples = entries.iter().map(Self::history_entry_to_tuple).collect();
Ok(tuples)
} else {
Ok(vec![])
}
}
fn history_entry_to_tuple(entry: &RefreshHistoryEntry) -> Tuple {
Tuple::new(vec![
Value::String(entry.mv_name.clone()),
Value::String(entry.start_time.to_rfc3339()),
Value::String(entry.end_time.to_rfc3339()),
Value::Boolean(entry.success),
Value::String(entry.error_message.clone().unwrap_or_default()),
Value::Int8(entry.rows_affected.unwrap_or(0)),
Value::String(entry.strategy.clone()),
Value::String(entry.trigger.clone()),
])
}
pub fn pg_mv_cpu_usage(&self) -> Result<CpuUsageInfo> {
let stats = self.scheduler.get_stats();
let worker_config = self.auto_refresh_worker.as_ref()
.map(|w| w.config())
.unwrap_or_default();
let info = CpuUsageInfo {
current_cpu_percent: stats.cpu_usage,
max_cpu_percent: worker_config.max_cpu_percent,
is_throttled: stats.cpu_usage > worker_config.max_cpu_percent,
active_tasks: stats.running_tasks,
queued_tasks: stats.queue_size,
};
Ok(info)
}
pub fn pg_mv_scheduler_stats(&self) -> Result<SchedulerStats> {
Ok(self.scheduler.get_stats())
}
pub fn status_to_tuple(status: &AutoRefreshStatus) -> Tuple {
Tuple::new(vec![
Value::String(status.mv_name.clone()),
Value::Boolean(status.auto_refresh_enabled),
Value::String(
status.last_refresh
.map(|t| t.to_rfc3339())
.unwrap_or_else(|| "never".to_string())
),
Value::Int8(status.staleness_seconds.unwrap_or(-1)),
Value::Int8(status.threshold_seconds),
Value::Boolean(status.is_refreshing),
Value::String(status.refresh_strategy.clone()),
Value::Int8(status.row_count.unwrap_or(0) as i64),
Value::Int4(status.base_table_count as i32),
])
}
pub fn status_schema() -> Schema {
Schema::new(vec![
Column::new("mv_name", DataType::Text),
Column::new("auto_refresh_enabled", DataType::Boolean),
Column::new("last_refresh", DataType::Text),
Column::new("staleness_seconds", DataType::Int8),
Column::new("threshold_seconds", DataType::Int8),
Column::new("is_refreshing", DataType::Boolean),
Column::new("refresh_strategy", DataType::Text),
Column::new("row_count", DataType::Int8),
Column::new("base_table_count", DataType::Int4),
])
}
pub fn history_schema() -> Schema {
Schema::new(vec![
Column::new("mv_name", DataType::Text),
Column::new("start_time", DataType::Text),
Column::new("end_time", DataType::Text),
Column::new("success", DataType::Boolean),
Column::new("error_message", DataType::Text),
Column::new("rows_affected", DataType::Int8),
Column::new("strategy", DataType::Text),
Column::new("trigger", DataType::Text),
])
}
pub fn cpu_usage_schema() -> Schema {
Schema::new(vec![
Column::new("current_cpu_percent", DataType::Float8),
Column::new("max_cpu_percent", DataType::Float8),
Column::new("is_throttled", DataType::Boolean),
Column::new("active_tasks", DataType::Int4),
Column::new("queued_tasks", DataType::Int4),
])
}
pub fn cpu_usage_to_tuple(info: &CpuUsageInfo) -> Tuple {
Tuple::new(vec![
Value::Float8(info.current_cpu_percent),
Value::Float8(info.max_cpu_percent),
Value::Boolean(info.is_throttled),
Value::Int4(info.active_tasks as i32),
Value::Int4(info.queued_tasks as i32),
])
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use crate::Config;
use super::super::mv_scheduler::SchedulerConfig;
#[test]
fn test_status_schema() {
let schema = MvSystemViews::status_schema();
assert_eq!(schema.columns.len(), 9);
assert_eq!(schema.columns[0].name, "mv_name");
assert_eq!(schema.columns[1].name, "auto_refresh_enabled");
}
#[test]
fn test_history_schema() {
let schema = MvSystemViews::history_schema();
assert_eq!(schema.columns.len(), 8);
assert_eq!(schema.columns[0].name, "mv_name");
assert_eq!(schema.columns[2].name, "end_time");
}
#[test]
fn test_cpu_usage_schema() {
let schema = MvSystemViews::cpu_usage_schema();
assert_eq!(schema.columns.len(), 5);
assert_eq!(schema.columns[0].name, "current_cpu_percent");
assert_eq!(schema.columns[2].name, "is_throttled");
}
#[test]
fn test_system_views_creation() {
let config = Config::in_memory();
let storage = Arc::new(StorageEngine::open_in_memory(&config).unwrap());
let scheduler_config = SchedulerConfig::default();
let scheduler = Arc::new(MVScheduler::new(scheduler_config, Arc::clone(&storage)));
let _system_views = MvSystemViews::new(storage, scheduler);
}
#[test]
fn test_cpu_usage_info() {
let info = CpuUsageInfo {
current_cpu_percent: 45.5,
max_cpu_percent: 70.0,
is_throttled: false,
active_tasks: 2,
queued_tasks: 5,
};
let tuple = MvSystemViews::cpu_usage_to_tuple(&info);
assert_eq!(tuple.values.len(), 5);
match &tuple.values[0] {
Value::Float8(v) => assert_eq!(*v, 45.5),
_ => panic!("Expected Float8"),
}
}
#[test]
fn test_history_entry_to_tuple() {
use chrono::Utc;
let entry = RefreshHistoryEntry {
mv_name: "test_mv".to_string(),
start_time: Utc::now(),
end_time: Utc::now(),
success: true,
error_message: None,
rows_affected: Some(100),
strategy: "incremental".to_string(),
trigger: "staleness".to_string(),
};
let tuple = MvSystemViews::history_entry_to_tuple(&entry);
assert_eq!(tuple.values.len(), 8);
match &tuple.values[0] {
Value::String(s) => assert_eq!(s, "test_mv"),
_ => panic!("Expected String"),
}
match &tuple.values[3] {
Value::Boolean(b) => assert!(*b),
_ => panic!("Expected Boolean"),
}
match &tuple.values[5] {
Value::Int8(n) => assert_eq!(*n, 100),
_ => panic!("Expected Int8"),
}
}
}