use alloc::collections::BTreeMap;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use super::types::*;
use crate::cache::arc::ARC;
use crate::dedup::fastdedup::FastDedupEngine;
use crate::storage::zpl::ZPL;
use crate::time;
const MAX_DATA_POINTS: usize = 8640;
pub struct AnalyticsCollector {
datasets: BTreeMap<String, DatasetMetrics>,
}
struct DatasetMetrics {
usage: UsageSummary,
io: IoStats,
history: BTreeMap<MetricType, Vec<TrendPoint>>,
last_update: u64,
}
impl AnalyticsCollector {
pub fn new() -> Self {
Self {
datasets: BTreeMap::new(),
}
}
fn get_or_create_dataset(&mut self, dataset: &str) -> &mut DatasetMetrics {
self.datasets
.entry(dataset.to_string())
.or_insert_with(|| DatasetMetrics {
usage: UsageSummary::default(),
io: IoStats::default(),
history: BTreeMap::new(),
last_update: 0,
})
}
pub fn record_metric(&mut self, dataset: &str, metric: MetricType, value: u64) {
let ds = self.get_or_create_dataset(dataset);
let timestamp = time::now();
ds.last_update = timestamp;
let history = ds.history.entry(metric).or_default();
history.push(TrendPoint { timestamp, value });
if history.len() > MAX_DATA_POINTS {
history.remove(0);
}
match metric {
MetricType::UsedSpace => ds.usage.used_space = value,
MetricType::FreeSpace => ds.usage.free_space = value,
MetricType::FileCount => ds.usage.file_count = value,
MetricType::ReadIops => ds.io.read_ops = value,
MetricType::WriteIops => ds.io.write_ops = value,
MetricType::ReadThroughput => ds.io.read_throughput = value,
MetricType::WriteThroughput => ds.io.write_throughput = value,
MetricType::CacheHitRate => ds.io.cache_hit_rate = value as f32 / 100.0,
_ => {}
}
}
pub fn get_usage_summary(&self, dataset: &str) -> Result<UsageSummary, AnalyticsError> {
let ds = self
.datasets
.get(dataset)
.ok_or_else(|| AnalyticsError::DatasetNotFound(dataset.to_string()))?;
let zpl = ZPL.lock();
let used_space = zpl.used_bytes();
let quota = zpl.quota();
let total_capacity = if quota > 0 { quota } else { u64::MAX };
let free_space = total_capacity.saturating_sub(used_space);
let usage_percent = if total_capacity > 0 && total_capacity != u64::MAX {
(used_space as f32 / total_capacity as f32) * 100.0
} else {
0.0
};
drop(zpl);
let (hits, misses, _entries, _evictions, _hit_rate) = FastDedupEngine::get_stats();
let total_ops = hits + misses;
let dedup_ratio = if total_ops > 0 && misses > 0 {
(total_ops as f64 / misses as f64) as f32
} else if total_ops > 0 && misses == 0 {
100.0
} else {
1.0
};
let compress_ratio = 1.0f32;
let reduction_ratio = dedup_ratio * compress_ratio;
Ok(UsageSummary {
total_capacity,
used_space,
free_space,
usage_percent,
file_count: ds.usage.file_count,
directory_count: ds.usage.directory_count,
snapshot_count: ds.usage.snapshot_count,
logical_size: used_space,
physical_size: (used_space as f32 / reduction_ratio.max(1.0)) as u64,
reduction_ratio,
})
}
pub fn get_space_breakdown(&self, dataset: &str) -> Result<SpaceBreakdown, AnalyticsError> {
use crate::dedup::dedup::DDT;
use crate::util::alloc::METASLAB;
let ds = self
.datasets
.get(dataset)
.ok_or_else(|| AnalyticsError::DatasetNotFound(dataset.to_string()))?;
let total_used = ds.usage.used_space;
let free_space = ds.usage.free_space;
let ddt = DDT.lock();
let ddt_entries = ddt.table.len() as u64;
drop(ddt);
let dedup_table_space = ddt_entries * 48;
let metaslab = METASLAB.lock();
let total_capacity = metaslab.total_capacity;
let total_free = metaslab.total_free;
drop(metaslab);
let estimated_objects = total_used / (128 * 1024);
let metadata_space = estimated_objects * 512;
let snapshot_count = ds.usage.snapshot_count;
let snapshot_space = (total_used / 100) * snapshot_count;
let index_space = ds.usage.directory_count * 64;
let reserved_space = total_capacity / 32;
let overhead = metadata_space + dedup_table_space + index_space + reserved_space;
let data_space = total_used.saturating_sub(overhead);
let reclaimable_space = if total_capacity > total_used + total_free {
total_capacity - total_used - total_free
} else {
0
};
Ok(SpaceBreakdown {
data_space,
metadata_space,
snapshot_space,
dedup_table_space,
index_space,
reserved_space,
free_space,
reclaimable_space,
})
}
pub fn get_io_stats(&self, dataset: &str) -> Result<IoStats, AnalyticsError> {
let ds = self
.datasets
.get(dataset)
.ok_or_else(|| AnalyticsError::DatasetNotFound(dataset.to_string()))?;
let arc = ARC.lock();
let (hits, misses, hit_rate, _cache_size) = arc.stats();
drop(arc);
let mut io = ds.io.clone();
io.cache_hit_rate = hit_rate as f32;
io.read_ops = hits + misses;
Ok(io)
}
pub fn forecast_capacity(
&self,
dataset: &str,
days: u32,
) -> Result<CapacityForecast, AnalyticsError> {
let ds = self
.datasets
.get(dataset)
.ok_or_else(|| AnalyticsError::DatasetNotFound(dataset.to_string()))?;
let history = ds
.history
.get(&MetricType::UsedSpace)
.ok_or(AnalyticsError::InsufficientData)?;
if history.len() < 2 {
return Err(AnalyticsError::InsufficientData);
}
let daily_growth = calculate_daily_growth(history);
let current_usage = ds.usage.used_space;
let total_capacity = ds.usage.total_capacity;
let predicted_usage = if daily_growth > 0 {
current_usage.saturating_add((daily_growth as u64) * (days as u64))
} else {
current_usage.saturating_sub((daily_growth.unsigned_abs()) * (days as u64))
};
let days_until_full = if daily_growth <= 0 {
-1 } else if current_usage >= total_capacity {
0 } else {
let remaining = total_capacity - current_usage;
(remaining / daily_growth as u64) as i32
};
let recommendation = match days_until_full {
-1 => ForecastRecommendation::NoAction,
0 => ForecastRecommendation::Critical,
1..=7 => ForecastRecommendation::Critical,
8..=30 => ForecastRecommendation::ExpansionNeeded,
31..=90 => ForecastRecommendation::ConsiderExpansion,
_ => ForecastRecommendation::NoAction,
};
Ok(CapacityForecast {
current_usage,
predicted_usage,
daily_growth_rate: daily_growth,
days_until_full,
recommendation,
confidence: calculate_confidence(history),
})
}
pub fn get_top_consumers(
&self,
dataset: &str,
limit: usize,
) -> Result<Vec<SpaceConsumer>, AnalyticsError> {
let ds = self
.datasets
.get(dataset)
.ok_or_else(|| AnalyticsError::DatasetNotFound(dataset.to_string()))?;
let zpl = ZPL.lock();
let root_id = zpl.root_id();
let mut consumers: Vec<SpaceConsumer> = Vec::new();
let total_used = ds.usage.used_space.max(1);
if let Ok(entries) = zpl.readdir(root_id) {
for entry in entries {
if entry.name == "." || entry.name == ".." {
continue;
}
if let Ok(stat) = zpl.getattr(entry.object_id) {
let is_dir = (stat.st_mode & 0o170000) == 0o040000;
consumers.push(SpaceConsumer {
path: alloc::format!("/{}", entry.name),
object_id: entry.object_id,
space_used: stat.st_size as u64,
percent_of_total: (stat.st_size as f64 / total_used as f64 * 100.0) as f32,
is_directory: is_dir,
file_count: if is_dir { stat.st_nlink } else { 0 },
});
}
}
}
consumers.sort_by(|a, b| b.space_used.cmp(&a.space_used));
consumers.truncate(limit);
Ok(consumers)
}
pub fn get_file_type_distribution(
&self,
dataset: &str,
) -> Result<Vec<FileTypeStats>, AnalyticsError> {
let ds = self
.datasets
.get(dataset)
.ok_or_else(|| AnalyticsError::DatasetNotFound(dataset.to_string()))?;
let zpl = ZPL.lock();
let root_id = zpl.root_id();
let total_used = ds.usage.used_space.max(1);
let mut type_map: BTreeMap<String, (u64, u64)> = BTreeMap::new();
if let Ok(entries) = zpl.readdir(root_id) {
for entry in entries {
if entry.name == "." || entry.name == ".." {
continue;
}
if entry.file_type == 8 {
if let Ok(stat) = zpl.getattr(entry.object_id) {
let ext = entry
.name
.rsplit('.')
.next()
.filter(|e| e.len() <= 10 && *e != entry.name)
.map(|e| e.to_string())
.unwrap_or_else(|| "other".to_string());
let stats = type_map.entry(ext).or_insert((0, 0));
stats.0 += 1;
stats.1 += stat.st_size as u64;
}
}
}
}
let mut result: Vec<FileTypeStats> = type_map
.into_iter()
.map(|(file_type, (count, total_size))| FileTypeStats {
file_type,
count,
total_size,
avg_size: if count > 0 { total_size / count } else { 0 },
percent_of_total: (total_size as f64 / total_used as f64 * 100.0) as f32,
})
.collect();
result.sort_by(|a, b| b.total_size.cmp(&a.total_size));
Ok(result)
}
pub fn get_dedup_stats(&self, dataset: &str) -> Result<DedupStats, AnalyticsError> {
let _ds = self
.datasets
.get(dataset)
.ok_or_else(|| AnalyticsError::DatasetNotFound(dataset.to_string()))?;
let (hits, misses, entries, _evictions, _hit_rate) = FastDedupEngine::get_stats();
const BLOCK_SIZE: u64 = 128 * 1024;
let total_blocks = hits + misses;
let unique_blocks = entries as u64;
let dedup_blocks = hits; let space_saved = dedup_blocks * BLOCK_SIZE;
let dedup_ratio = if unique_blocks > 0 {
(total_blocks as f32) / (unique_blocks as f32)
} else {
1.0
};
let table_entries = entries as u64;
let table_size = table_entries * 48;
Ok(DedupStats {
total_blocks,
unique_blocks,
dedup_blocks,
space_saved,
dedup_ratio,
table_entries,
table_size,
})
}
pub fn get_compression_stats(&self, dataset: &str) -> Result<CompressionStats, AnalyticsError> {
let _ds = self
.datasets
.get(dataset)
.ok_or_else(|| AnalyticsError::DatasetNotFound(dataset.to_string()))?;
Ok(CompressionStats::default())
}
pub fn get_snapshot_usage(&self, dataset: &str) -> Result<SnapshotUsage, AnalyticsError> {
let _ds = self
.datasets
.get(dataset)
.ok_or_else(|| AnalyticsError::DatasetNotFound(dataset.to_string()))?;
Ok(SnapshotUsage::default())
}
pub fn get_trend(
&self,
dataset: &str,
metric: MetricType,
period: TrendPeriod,
) -> Result<Vec<TrendPoint>, AnalyticsError> {
let ds = self
.datasets
.get(dataset)
.ok_or_else(|| AnalyticsError::DatasetNotFound(dataset.to_string()))?;
let history = ds
.history
.get(&metric)
.ok_or(AnalyticsError::InsufficientData)?;
let points_to_return = match period {
TrendPeriod::Hour => 60, TrendPeriod::Day => 24, TrendPeriod::Week => 168, TrendPeriod::Month => 30, TrendPeriod::Year => 52, };
let result = sample_history(history, points_to_return);
Ok(result)
}
pub fn generate_full_report(&self, dataset: &str) -> Result<FullReport, AnalyticsError> {
let usage = self.get_usage_summary(dataset)?;
let space = self.get_space_breakdown(dataset)?;
let io = self.get_io_stats(dataset)?;
let forecast = self.forecast_capacity(dataset, 30).unwrap_or_default();
let dedup = self.get_dedup_stats(dataset)?;
let compression = self.get_compression_stats(dataset)?;
let snapshots = self.get_snapshot_usage(dataset)?;
let top_consumers = self.get_top_consumers(dataset, 10)?;
let file_types = self.get_file_type_distribution(dataset)?;
let health_score = calculate_health_score(&usage, &io, &forecast);
let recommendations = generate_recommendations(&usage, &forecast, &dedup);
Ok(FullReport {
generated_at: 0, dataset: dataset.to_string(),
usage,
space,
io,
forecast,
dedup,
compression,
snapshots,
top_consumers,
file_types,
health_score,
recommendations,
})
}
}
impl Default for AnalyticsCollector {
fn default() -> Self {
Self::new()
}
}
fn calculate_daily_growth(history: &[TrendPoint]) -> i64 {
if history.len() < 2 {
return 0;
}
let first = &history[0];
let last = &history[history.len() - 1];
let time_diff = last.timestamp.saturating_sub(first.timestamp);
if time_diff == 0 {
return 0;
}
let value_diff = last.value as i64 - first.value as i64;
let days = (time_diff / 86400).max(1);
value_diff / days as i64
}
fn calculate_confidence(history: &[TrendPoint]) -> f32 {
let data_points = history.len();
if data_points < 10 {
0.3
} else if data_points < 100 {
0.5
} else if data_points < 1000 {
0.7
} else {
0.9
}
}
fn sample_history(history: &[TrendPoint], count: usize) -> Vec<TrendPoint> {
if history.len() <= count {
return history.to_vec();
}
let step = history.len() / count;
history.iter().step_by(step).take(count).cloned().collect()
}
fn calculate_health_score(usage: &UsageSummary, io: &IoStats, forecast: &CapacityForecast) -> u8 {
let mut score: u8 = 100;
if usage.usage_percent > 90.0 {
score = score.saturating_sub(30);
} else if usage.usage_percent > 80.0 {
score = score.saturating_sub(15);
} else if usage.usage_percent > 70.0 {
score = score.saturating_sub(5);
}
if io.cache_hit_rate < 0.5 {
score = score.saturating_sub(10);
}
match forecast.recommendation {
ForecastRecommendation::Critical => score = score.saturating_sub(25),
ForecastRecommendation::ExpansionNeeded => score = score.saturating_sub(10),
_ => {}
}
score
}
fn generate_recommendations(
usage: &UsageSummary,
forecast: &CapacityForecast,
dedup: &DedupStats,
) -> Vec<String> {
let mut recommendations = Vec::new();
if usage.usage_percent > 85.0 {
recommendations.push(
"Storage usage is high. Consider adding capacity or cleaning up old data.".to_string(),
);
}
if forecast.recommendation == ForecastRecommendation::Critical {
recommendations
.push("Critical: Storage will be full soon. Immediate action required.".to_string());
}
if dedup.dedup_ratio < 1.1 && dedup.total_blocks > 0 {
recommendations.push(
"Low deduplication ratio. Consider if dedup is beneficial for this workload."
.to_string(),
);
}
if usage.reduction_ratio < 1.2 {
recommendations.push(
"Low data reduction. Review compression settings for better efficiency.".to_string(),
);
}
recommendations
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_collector_new() {
let collector = AnalyticsCollector::new();
assert!(collector.datasets.is_empty());
}
#[test]
fn test_record_metric() {
let mut collector = AnalyticsCollector::new();
collector.record_metric("test", MetricType::FileCount, 1000);
let usage = collector.get_usage_summary("test").unwrap();
assert_eq!(usage.file_count, 1000);
}
#[test]
fn test_health_score() {
let usage = UsageSummary {
usage_percent: 50.0,
..Default::default()
};
let io = IoStats {
cache_hit_rate: 0.9,
..Default::default()
};
let forecast = CapacityForecast::default();
let score = calculate_health_score(&usage, &io, &forecast);
assert_eq!(score, 100);
}
}