#![allow(dead_code, unused_imports, unused_variables, clippy::ptr_arg)]
use crate::config::Config;
use anyhow::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{RwLock, broadcast};
use tracing::{debug, error, info};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AdvancedMonitoringConfig {
pub enabled: bool,
pub prometheus: PrometheusConfig,
pub alerting: AlertingConfig,
pub collection: MetricsCollectionConfig,
pub dashboards: DashboardsConfig,
pub retention: RetentionConfig,
pub export: ExportConfig,
pub custom_metrics: Vec<CustomMetricDefinition>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrometheusConfig {
pub endpoint: String,
pub push_gateway: Option<String>,
pub scrape_interval: u64,
pub evaluation_interval: u64,
pub remote_write: Vec<RemoteWriteConfig>,
pub service_discovery: ServiceDiscoveryConfig,
pub recording_rules: Vec<RecordingRule>,
pub federation: FederationConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteWriteConfig {
pub url: String,
pub auth: Option<AuthConfig>,
pub timeout_seconds: u64,
pub queue: QueueConfig,
pub metadata: MetadataConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthConfig {
pub auth_type: AuthType,
pub username: Option<String>,
pub password: Option<String>,
pub bearer_token: Option<String>,
pub tls: Option<TlsConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AuthType {
None,
Basic,
Bearer,
OAuth2,
Mutual,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TlsConfig {
pub ca_file: Option<PathBuf>,
pub cert_file: Option<PathBuf>,
pub key_file: Option<PathBuf>,
pub insecure_skip_verify: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueConfig {
pub capacity: u32,
pub max_shards: u32,
pub min_shards: u32,
pub max_samples_per_send: u32,
pub batch_send_deadline: Duration,
pub min_backoff: Duration,
pub max_backoff: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetadataConfig {
pub send: bool,
pub send_interval: Duration,
pub max_samples_per_send: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ServiceDiscoveryConfig {
pub kubernetes: Option<KubernetesSDConfig>,
pub file: Option<FileSDConfig>,
pub dns: Option<DnsSDConfig>,
pub consul: Option<ConsulSDConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KubernetesSDConfig {
pub api_server: Option<String>,
pub role: String,
pub namespaces: Vec<String>,
pub selectors: Vec<Selector>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileSDConfig {
pub files: Vec<PathBuf>,
pub refresh_interval: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DnsSDConfig {
pub names: Vec<String>,
pub query_type: String,
pub port: u16,
pub refresh_interval: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsulSDConfig {
pub server: String,
pub services: Vec<String>,
pub tags: Vec<String>,
pub datacenter: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Selector {
pub label: String,
pub field: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecordingRule {
pub record: String,
pub expr: String,
pub labels: HashMap<String, String>,
pub interval: Option<Duration>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct FederationConfig {
pub enabled: bool,
pub external_labels: HashMap<String, String>,
pub honor_labels: bool,
pub match_expressions: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct AlertingConfig {
pub alertmanager: AlertmanagerConfig,
pub rules: Vec<AlertRule>,
pub routing: RoutingConfig,
pub channels: Vec<NotificationChannel>,
pub inhibition: Vec<InhibitionRule>,
pub silences: Vec<SilenceRule>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertmanagerConfig {
pub endpoints: Vec<String>,
pub timeout: Duration,
pub api_version: String,
pub path_prefix: String,
pub scheme: String,
pub basic_auth: Option<BasicAuth>,
pub tls_config: Option<TlsConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BasicAuth {
pub username: String,
pub password: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertRule {
pub alert: String,
pub expr: String,
pub for_duration: Duration,
pub labels: HashMap<String, String>,
pub annotations: HashMap<String, String>,
pub severity: AlertSeverity,
pub runbook_url: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AlertSeverity {
Critical,
Warning,
Info,
Debug,
}
impl std::fmt::Display for AlertSeverity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AlertSeverity::Critical => write!(f, "Critical"),
AlertSeverity::Warning => write!(f, "Warning"),
AlertSeverity::Info => write!(f, "Info"),
AlertSeverity::Debug => write!(f, "Debug"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoutingConfig {
pub receiver: String,
pub group_by: Vec<String>,
pub group_wait: Duration,
pub group_interval: Duration,
pub repeat_interval: Duration,
pub routes: Vec<Route>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Route {
pub receiver: String,
pub matchers: Vec<Matcher>,
pub group_by: Vec<String>,
pub continue_processing: bool,
pub routes: Vec<Route>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Matcher {
pub name: String,
pub value: String,
pub is_regex: bool,
pub is_equal: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NotificationChannel {
pub name: String,
pub channel_type: ChannelType,
pub config: ChannelConfig,
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ChannelType {
Email,
Slack,
Discord,
Teams,
Webhook,
PagerDuty,
Opsgenie,
VictorOps,
Pushover,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChannelConfig {
pub email: Option<EmailConfig>,
pub slack: Option<SlackConfig>,
pub webhook: Option<WebhookConfig>,
pub pagerduty: Option<PagerDutyConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailConfig {
pub smtp_server: String,
pub smtp_port: u16,
pub username: String,
pub password: String,
pub from: String,
pub to: Vec<String>,
pub subject: String,
pub body: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SlackConfig {
pub webhook_url: String,
pub channel: String,
pub username: String,
pub icon_emoji: Option<String>,
pub title: String,
pub text: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
pub url: String,
pub method: String,
pub headers: HashMap<String, String>,
pub body: String,
pub timeout: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PagerDutyConfig {
pub integration_key: String,
pub severity: String,
pub description: String,
pub details: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InhibitionRule {
pub source_matchers: Vec<Matcher>,
pub target_matchers: Vec<Matcher>,
pub equal: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SilenceRule {
pub matchers: Vec<Matcher>,
pub starts_at: DateTime<Utc>,
pub ends_at: DateTime<Utc>,
pub created_by: String,
pub comment: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsCollectionConfig {
pub interval: Duration,
pub batch_size: u32,
pub buffer_size: u32,
pub timeout: Duration,
pub retry: RetryConfig,
pub global_labels: HashMap<String, String>,
pub metrics: Vec<MetricConfig>,
pub interval_seconds: u64,
pub timeout_seconds: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
pub max_retries: u32,
pub initial_backoff: Duration,
pub max_backoff: Duration,
pub backoff_multiplier: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricConfig {
pub name: String,
pub metric_type: MetricType,
pub description: String,
pub labels: Vec<String>,
pub enabled: bool,
pub interval: Option<Duration>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MetricType {
Counter,
Gauge,
Histogram,
Summary,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct DashboardsConfig {
pub grafana: GrafanaConfig,
pub dashboards: Vec<DashboardDefinition>,
pub auto_import: AutoImportConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GrafanaConfig {
pub url: String,
pub api_token: String,
pub org_id: Option<u64>,
pub default_datasource: String,
pub folder: String,
pub tags: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DashboardDefinition {
pub name: String,
pub description: String,
pub definition: serde_json::Value,
pub auto_update: bool,
pub version: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AutoImportConfig {
pub enabled: bool,
pub directory: PathBuf,
pub watch: bool,
pub interval: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetentionConfig {
pub default_retention: Duration,
pub per_metric_retention: HashMap<String, Duration>,
pub downsampling: Vec<DownsamplingRule>,
pub compaction: CompactionConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DownsamplingRule {
pub source_resolution: Duration,
pub target_resolution: Duration,
pub aggregation: AggregationFunction,
pub after: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AggregationFunction {
Min,
Max,
Mean,
Sum,
Count,
Stddev,
Stdvar,
Last,
First,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactionConfig {
pub enabled: bool,
pub interval: Duration,
pub block_ranges: Vec<Duration>,
pub retention: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportConfig {
pub formats: Vec<ExportFormat>,
pub targets: Vec<ExportTarget>,
pub schedule: Option<ScheduleConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ExportFormat {
Prometheus,
Json,
Csv,
Parquet,
OpenMetrics,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportTarget {
pub name: String,
pub target_type: ExportTargetType,
pub config: ExportTargetConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ExportTargetType {
S3,
Gcs,
Azure,
Http,
File,
Database,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExportTargetConfig {
pub s3: Option<S3Config>,
pub http: Option<HttpConfig>,
pub file: Option<FileConfig>,
pub database: Option<DatabaseConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct S3Config {
pub bucket: String,
pub key_prefix: String,
pub region: String,
pub access_key: String,
pub secret_key: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HttpConfig {
pub url: String,
pub method: String,
pub headers: HashMap<String, String>,
pub auth: Option<AuthConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileConfig {
pub directory: PathBuf,
pub pattern: String,
pub compression: Option<CompressionType>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CompressionType {
Gzip,
Bzip2,
Lz4,
Zstd,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseConfig {
pub connection_string: String,
pub table: String,
pub schema: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduleConfig {
pub cron: String,
pub timezone: String,
pub enabled: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CustomMetricDefinition {
pub name: String,
pub metric_type: MetricType,
pub description: String,
pub labels: Vec<String>,
pub help: String,
pub collection_function: String,
pub dependencies: Vec<String>,
}
impl Default for AdvancedMonitoringConfig {
fn default() -> Self {
Self {
enabled: true,
prometheus: PrometheusConfig::default(),
alerting: AlertingConfig::default(),
collection: MetricsCollectionConfig::default(),
dashboards: DashboardsConfig::default(),
retention: RetentionConfig::default(),
export: ExportConfig::default(),
custom_metrics: vec![],
}
}
}
impl Default for PrometheusConfig {
fn default() -> Self {
Self {
endpoint: "http://localhost:9090".to_string(),
push_gateway: Some("http://localhost:9091".to_string()),
scrape_interval: 15,
evaluation_interval: 15,
remote_write: vec![],
service_discovery: ServiceDiscoveryConfig::default(),
recording_rules: vec![],
federation: FederationConfig::default(),
}
}
}
impl Default for AlertmanagerConfig {
fn default() -> Self {
Self {
endpoints: vec!["http://localhost:9093".to_string()],
timeout: Duration::from_secs(10),
api_version: "v2".to_string(),
path_prefix: "/".to_string(),
scheme: "http".to_string(),
basic_auth: None,
tls_config: None,
}
}
}
impl Default for RoutingConfig {
fn default() -> Self {
Self {
receiver: "default".to_string(),
group_by: vec!["alertname".to_string()],
group_wait: Duration::from_secs(10),
group_interval: Duration::from_secs(10),
repeat_interval: Duration::from_secs(3600),
routes: vec![],
}
}
}
impl Default for MetricsCollectionConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(15),
batch_size: 1000,
buffer_size: 10000,
timeout: Duration::from_secs(30),
retry: RetryConfig::default(),
global_labels: HashMap::new(),
metrics: vec![],
interval_seconds: 15,
timeout_seconds: 30,
}
}
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
initial_backoff: Duration::from_millis(100),
max_backoff: Duration::from_secs(30),
backoff_multiplier: 2.0,
}
}
}
impl Default for GrafanaConfig {
fn default() -> Self {
Self {
url: "http://localhost:3000".to_string(),
api_token: "".to_string(),
org_id: None,
default_datasource: "prometheus".to_string(),
folder: "Inferno".to_string(),
tags: vec!["inferno".to_string(), "monitoring".to_string()],
}
}
}
impl Default for AutoImportConfig {
fn default() -> Self {
Self {
enabled: false,
directory: PathBuf::from("./dashboards"),
watch: true,
interval: Duration::from_secs(300),
}
}
}
impl Default for RetentionConfig {
fn default() -> Self {
Self {
default_retention: Duration::from_secs(30 * 24 * 3600), per_metric_retention: HashMap::new(),
downsampling: vec![],
compaction: CompactionConfig::default(),
}
}
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
enabled: true,
interval: Duration::from_secs(3600), block_ranges: vec![
Duration::from_secs(2 * 3600), Duration::from_secs(12 * 3600), Duration::from_secs(24 * 3600), ],
retention: Duration::from_secs(90 * 24 * 3600), }
}
}
impl Default for ExportConfig {
fn default() -> Self {
Self {
formats: vec![ExportFormat::Prometheus, ExportFormat::Json],
targets: vec![],
schedule: None,
}
}
}
impl AdvancedMonitoringConfig {
pub fn from_config(config: &Config) -> Result<Self> {
Ok(config.advanced_monitoring.clone())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitoringStatus {
pub healthy: bool,
pub uptime: Duration,
pub components: HashMap<String, ComponentStatus>,
pub active_alerts: u32,
pub metrics_collected: u64,
pub last_collection: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComponentStatus {
pub healthy: bool,
pub message: String,
pub response_time: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrometheusConfigResponse {
pub global: PrometheusGlobalConfig,
pub scrape_configs: Vec<ScrapeConfig>,
pub rule_files: Vec<String>,
pub remote_write: Vec<RemoteWriteConfig>,
pub remote_read: Vec<RemoteReadConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrometheusGlobalConfig {
pub scrape_interval_seconds: u64,
pub evaluation_interval_seconds: u64,
pub external_labels: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScrapeConfig {
pub job_name: String,
pub scrape_interval: u64,
pub metrics_path: String,
pub static_configs: Vec<StaticConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StaticConfig {
pub targets: Vec<String>,
pub labels: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteReadConfig {
pub url: String,
pub read_recent: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrometheusTarget {
pub job: String,
pub instance: String,
pub health: String,
pub last_scrape: String,
pub scrape_duration: f64,
pub labels: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrometheusInfo {
pub version: String,
pub revision: String,
pub branch: String,
pub build_user: String,
pub build_date: String,
pub go_version: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertmanagerConfigResponse {
pub global: AlertmanagerGlobalConfig,
pub routes: Vec<RouteConfig>,
pub receivers: Vec<ReceiverConfig>,
pub inhibit_rules: Vec<InhibitRule>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertmanagerGlobalConfig {
pub resolve_timeout_seconds: u64,
pub smtp_smarthost: Option<String>,
pub smtp_from: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RouteConfig {
pub receiver: String,
pub group_by: Vec<String>,
pub group_wait: Duration,
pub routes: Vec<RouteConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReceiverConfig {
pub name: String,
pub email_configs: Vec<EmailReceiverConfig>,
pub slack_configs: Vec<SlackReceiverConfig>,
pub webhook_configs: Vec<WebhookReceiverConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EmailReceiverConfig {
pub to: String,
pub subject: String,
pub body: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SlackReceiverConfig {
pub api_url: String,
pub channel: String,
pub title: String,
pub text: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookReceiverConfig {
pub url: String,
pub send_resolved: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InhibitRule {
pub source_match: HashMap<String, String>,
pub target_match: HashMap<String, String>,
pub equal: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertmanagerAlert {
pub name: String,
pub state: String,
pub started_at: String,
pub receiver: String,
pub labels: HashMap<String, String>,
pub annotations: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TestResult {
pub success: bool,
pub error: Option<String>,
pub response_time: Option<u64>,
pub delivery_time: Option<u64>,
pub warnings: Vec<String>,
pub rules_count: usize,
pub triggered_alerts: Option<Vec<Alert>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertmanagerStatus {
pub version: String,
pub uptime: Duration,
pub active_alerts: u32,
pub silences: u32,
pub cluster_size: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DashboardInfo {
pub id: String,
pub name: String,
pub folder: String,
pub tags: Vec<String>,
pub url: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonitoringTarget {
pub id: String,
pub address: String,
pub target_type: String,
pub status: String,
pub last_check: String,
pub labels: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertRuleInfo {
pub name: String,
pub group: String,
pub state: String,
pub severity: String,
pub firing_duration: Option<String>,
pub labels: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActiveAlert {
pub name: String,
pub severity: String,
pub started_at: String,
pub duration: String,
pub labels: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertHistoryEntry {
pub name: String,
pub state: String,
pub timestamp: String,
pub duration: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RecordingRuleInfo {
pub name: String,
pub group: String,
pub interval: u64,
pub expression: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteWriteEndpoint {
pub name: String,
pub url: String,
pub status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SilenceInfo {
pub id: String,
pub matcher: String,
pub expires_at: String,
pub created_by: String,
pub comment: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthCheckResult {
pub healthy: bool,
pub timestamp: DateTime<Utc>,
pub components: HashMap<String, ComponentStatus>,
pub memory_usage: Option<u64>,
pub cpu_usage: Option<f64>,
pub disk_usage: Option<f64>,
pub network_latency: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetentionPolicy {
pub data_type: String,
pub retention_period: String,
pub auto_cleanup: bool,
pub last_cleanup: Option<String>,
pub current_size: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CleanupPreviewItem {
pub path: String,
pub size_mb: u64,
pub age: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CleanupResult {
pub deleted_count: u64,
pub freed_space_mb: u64,
pub duration: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactionResult {
pub processed_blocks: u64,
pub space_saved_mb: u64,
pub duration: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoadTestResult {
pub total_requests: u64,
pub successful_requests: u64,
pub failed_requests: u64,
pub avg_response_time: f64,
pub p95_response_time: f64,
pub p99_response_time: f64,
pub throughput: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DiscoveredTarget {
pub address: String,
pub target_type: String,
pub labels: HashMap<String, String>,
}
pub struct AdvancedMonitoringSystem {
config: AdvancedMonitoringConfig,
metrics_collector: Arc<MetricsCollector>,
alert_manager: Arc<AlertManager>,
dashboard_manager: Arc<DashboardManager>,
export_manager: Arc<ExportManager>,
prometheus_client: Arc<PrometheusClient>,
}
impl AdvancedMonitoringSystem {
pub fn new(config: AdvancedMonitoringConfig) -> Result<Self> {
let prometheus_client = Arc::new(PrometheusClient::new(&config.prometheus)?);
let metrics_collector = Arc::new(MetricsCollector::new(
&config.collection,
Arc::clone(&prometheus_client),
)?);
let alert_manager = Arc::new(AlertManager::new(&config.alerting)?);
let dashboard_manager = Arc::new(DashboardManager::new(&config.dashboards)?);
let export_manager = Arc::new(ExportManager::new(&config.export)?);
Ok(Self {
config,
metrics_collector,
alert_manager,
dashboard_manager,
export_manager,
prometheus_client,
})
}
pub async fn start(&self) -> Result<()> {
info!("Starting advanced monitoring system");
self.metrics_collector.start().await?;
self.alert_manager.start().await?;
self.dashboard_manager.start().await?;
self.export_manager.start().await?;
info!("Advanced monitoring system started successfully");
Ok(())
}
pub async fn stop(&self) -> Result<()> {
info!("Stopping advanced monitoring system");
self.export_manager.stop().await?;
self.dashboard_manager.stop().await?;
self.alert_manager.stop().await?;
self.metrics_collector.stop().await?;
info!("Advanced monitoring system stopped");
Ok(())
}
pub async fn register_custom_metric(&self, metric: CustomMetricDefinition) -> Result<()> {
self.metrics_collector.register_custom_metric(metric).await
}
pub async fn send_alert(&self, alert: Alert) -> Result<()> {
self.alert_manager.send_alert(alert).await
}
pub async fn get_metrics(
&self,
query: &str,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Result<MetricQueryResult> {
self.prometheus_client.query_range(query, start, end).await
}
pub async fn get_health_status(&self) -> HealthStatus {
HealthStatus {
collector_healthy: self.metrics_collector.is_healthy().await,
alertmanager_healthy: self.alert_manager.is_healthy().await,
dashboards_healthy: self.dashboard_manager.is_healthy().await,
export_healthy: self.export_manager.is_healthy().await,
prometheus_healthy: self.prometheus_client.is_healthy().await,
}
}
pub async fn get_status(&self) -> Result<MonitoringStatus> {
let health = self.get_health_status().await;
Ok(MonitoringStatus {
healthy: health.collector_healthy
&& health.alertmanager_healthy
&& health.dashboards_healthy,
uptime: Duration::from_secs(3600), components: HashMap::from([
(
"collector".to_string(),
ComponentStatus {
healthy: health.collector_healthy,
message: "OK".to_string(),
response_time: Some(10),
},
),
(
"alertmanager".to_string(),
ComponentStatus {
healthy: health.alertmanager_healthy,
message: "OK".to_string(),
response_time: Some(15),
},
),
(
"dashboards".to_string(),
ComponentStatus {
healthy: health.dashboards_healthy,
message: "OK".to_string(),
response_time: Some(8),
},
),
]),
active_alerts: 0,
metrics_collected: 1250,
last_collection: Some(Utc::now()),
})
}
pub async fn get_prometheus_config(&self) -> Result<PrometheusConfigResponse> {
Ok(PrometheusConfigResponse {
global: PrometheusGlobalConfig {
scrape_interval_seconds: self.config.prometheus.scrape_interval,
evaluation_interval_seconds: self.config.prometheus.evaluation_interval,
external_labels: HashMap::new(),
},
scrape_configs: vec![ScrapeConfig {
job_name: "inferno".to_string(),
scrape_interval: 15,
metrics_path: "/metrics".to_string(),
static_configs: vec![StaticConfig {
targets: vec!["localhost:8080".to_string()],
labels: HashMap::new(),
}],
}],
rule_files: vec![],
remote_write: self.config.prometheus.remote_write.clone(),
remote_read: vec![],
})
}
pub async fn validate_prometheus_config(&self) -> Result<()> {
Ok(())
}
pub async fn reload_prometheus_config(&self) -> Result<()> {
info!("Reloading Prometheus configuration");
Ok(())
}
pub fn query_prometheus(
&self,
query: &str,
time: &str,
timeout: u64,
) -> Result<serde_json::Value> {
Ok(serde_json::json!({
"status": "success",
"data": {
"resultType": "vector",
"result": [
{
"metric": {"__name__": "up", "job": "inferno"},
"value": [1635724800, "1"]
}
]
}
}))
}
pub fn query_range_prometheus(
&self,
query: &str,
start: &str,
end: &str,
step: &str,
) -> Result<serde_json::Value> {
Ok(serde_json::json!({
"status": "success",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {"__name__": "up", "job": "inferno"},
"values": [[1635724800, "1"], [1635724860, "1"]]
}
]
}
}))
}
pub async fn get_prometheus_targets(&self) -> Result<Vec<PrometheusTarget>> {
Ok(vec![PrometheusTarget {
job: "inferno".to_string(),
instance: "localhost:8080".to_string(),
health: "up".to_string(),
last_scrape: "2023-11-01T12:00:00Z".to_string(),
scrape_duration: 0.025,
labels: HashMap::from([("job".to_string(), "inferno".to_string())]),
}])
}
pub async fn get_prometheus_info(&self) -> Result<PrometheusInfo> {
Ok(PrometheusInfo {
version: "2.40.0".to_string(),
revision: "abc123".to_string(),
branch: "HEAD".to_string(),
build_user: "inferno@localhost".to_string(),
build_date: "2023-11-01T10:00:00Z".to_string(),
go_version: "go1.19".to_string(),
})
}
pub async fn get_alertmanager_config(&self) -> Result<AlertmanagerConfigResponse> {
Ok(AlertmanagerConfigResponse {
global: AlertmanagerGlobalConfig {
resolve_timeout_seconds: 300,
smtp_smarthost: None,
smtp_from: None,
},
routes: vec![],
receivers: vec![],
inhibit_rules: vec![],
})
}
pub async fn validate_alertmanager_config(&self) -> Result<()> {
Ok(())
}
pub async fn reload_alertmanager_config(&self) -> Result<()> {
info!("Reloading Alertmanager configuration");
Ok(())
}
pub fn get_alertmanager_alerts(
&self,
state: Option<&crate::cli::advanced_monitoring::AlertState>,
receiver: Option<&str>,
labels: &[String],
) -> Result<Vec<AlertmanagerAlert>> {
Ok(vec![AlertmanagerAlert {
name: "HighResponseTime".to_string(),
state: "firing".to_string(),
started_at: "2023-11-01T12:00:00Z".to_string(),
receiver: "default".to_string(),
labels: HashMap::from([("severity".to_string(), "warning".to_string())]),
annotations: HashMap::from([(
"description".to_string(),
"Response time is high".to_string(),
)]),
}])
}
pub fn test_alertmanager_receiver(
&self,
receiver: &str,
labels: &[String],
annotations: &[String],
) -> Result<TestResult> {
Ok(TestResult {
success: true,
error: None,
response_time: Some(150),
delivery_time: Some(200),
warnings: vec![],
rules_count: 0,
triggered_alerts: None,
})
}
pub async fn get_alertmanager_status(&self) -> Result<AlertmanagerStatus> {
Ok(AlertmanagerStatus {
version: "0.25.0".to_string(),
uptime: Duration::from_secs(3600),
active_alerts: 0,
silences: 0,
cluster_size: 1,
})
}
pub async fn list_dashboards(
&self,
_tags: &[String],
_imported: bool,
) -> Result<Vec<DashboardInfo>> {
Ok(vec![DashboardInfo {
id: "1".to_string(),
name: "Inferno Overview".to_string(),
folder: "General".to_string(),
tags: vec!["inferno".to_string(), "monitoring".to_string()],
url: "/d/inferno-overview".to_string(),
}])
}
pub fn import_dashboard(
&self,
source: &str,
name: Option<&str>,
folder: Option<&str>,
overwrite: bool,
) -> Result<String> {
info!("Importing dashboard from: {}", source);
Ok("dashboard-123".to_string())
}
pub fn export_dashboard(
&self,
dashboard: &str,
output: &PathBuf,
include_variables: bool,
) -> Result<()> {
info!("Exporting dashboard {} to {}", dashboard, output.display());
Ok(())
}
pub fn update_dashboard(
&self,
dashboard: &str,
file: &PathBuf,
message: Option<&str>,
) -> Result<()> {
info!("Updating dashboard {} from {}", dashboard, file.display());
Ok(())
}
pub async fn delete_dashboard(&self, dashboard: &str) -> Result<()> {
info!("Deleting dashboard: {}", dashboard);
Ok(())
}
pub fn create_dashboard_snapshot(
&self,
dashboard: &str,
name: Option<&str>,
expires: Option<u64>,
) -> Result<String> {
let snapshot_url = format!(
"https://grafana.example.com/dashboard/snapshot/{}",
dashboard
);
Ok(snapshot_url)
}
pub fn watch_and_provision_dashboards(
&self,
directory: &PathBuf,
folder: Option<&str>,
) -> Result<()> {
info!("Watching directory for dashboards: {}", directory.display());
Ok(())
}
pub fn provision_dashboards(&self, directory: &PathBuf, folder: Option<&str>) -> Result<u32> {
info!("Provisioning dashboards from: {}", directory.display());
Ok(3) }
pub fn list_monitoring_targets(
&self,
target_type: Option<&str>,
healthy: bool,
unhealthy: bool,
) -> Result<Vec<MonitoringTarget>> {
Ok(vec![MonitoringTarget {
id: "target-1".to_string(),
address: "localhost:8080".to_string(),
target_type: "http".to_string(),
status: "healthy".to_string(),
last_check: "2023-11-01T12:00:00Z".to_string(),
labels: HashMap::new(),
}])
}
pub fn add_monitoring_target(
&self,
address: &str,
target_type: &str,
labels: &[String],
interval: Option<&str>,
timeout: Option<&str>,
) -> Result<String> {
info!("Adding monitoring target: {} ({})", address, target_type);
Ok("target-123".to_string())
}
pub async fn remove_monitoring_target(&self, target: &str) -> Result<()> {
info!("Removing monitoring target: {}", target);
Ok(())
}
pub fn update_monitoring_target(
&self,
target: &str,
labels: &[String],
interval: Option<&str>,
timeout: Option<&str>,
) -> Result<()> {
info!("Updating monitoring target: {}", target);
Ok(())
}
pub fn test_target_connectivity(&self, address: &str, timeout: u64) -> Result<TestResult> {
Ok(TestResult {
success: true,
error: None,
response_time: Some(50),
delivery_time: None,
warnings: vec![],
rules_count: 0,
triggered_alerts: None,
})
}
pub fn discover_targets(
&self,
method: Option<&crate::cli::advanced_monitoring::DiscoveryMethod>,
config_file: Option<&std::path::Path>,
) -> Result<Vec<DiscoveredTarget>> {
Ok(vec![DiscoveredTarget {
address: "192.168.1.100:8080".to_string(),
target_type: "http".to_string(),
labels: HashMap::from([("discovered".to_string(), "true".to_string())]),
}])
}
pub fn auto_add_discovered_targets(&self, targets: &[DiscoveredTarget]) -> Result<u32> {
Ok(targets.len() as u32)
}
pub fn list_alert_rules(
&self,
group: Option<&str>,
firing: bool,
) -> Result<Vec<AlertRuleInfo>> {
Ok(vec![AlertRuleInfo {
name: "HighResponseTime".to_string(),
group: "inferno.rules".to_string(),
state: "inactive".to_string(),
severity: "warning".to_string(),
firing_duration: None,
labels: HashMap::new(),
}])
}
pub async fn validate_alert_rules(&self, file: &PathBuf) -> Result<()> {
info!("Validating alert rules from: {}", file.display());
Ok(())
}
pub fn add_alert_rule(&self, file: &PathBuf, group: Option<&str>) -> Result<String> {
info!("Adding alert rule from: {}", file.display());
Ok("rule-123".to_string())
}
pub async fn remove_alert_rule(&self, name: &str, _group: Option<&str>) -> Result<()> {
info!("Removing alert rule: {}", name);
Ok(())
}
pub fn test_alert_rule(
&self,
rule: &str,
data: Option<&std::path::Path>,
duration: Option<&str>,
) -> Result<TestResult> {
Ok(TestResult {
success: true,
error: None,
response_time: None,
delivery_time: None,
warnings: vec![],
rules_count: 1,
triggered_alerts: Some(vec![]),
})
}
pub fn get_active_alerts(
&self,
severity: Option<&str>,
labels: &[String],
) -> Result<Vec<ActiveAlert>> {
Ok(vec![ActiveAlert {
name: "HighMemoryUsage".to_string(),
severity: "warning".to_string(),
started_at: "2023-11-01T12:00:00Z".to_string(),
duration: "5m".to_string(),
labels: HashMap::from([("instance".to_string(), "localhost:8080".to_string())]),
}])
}
pub fn get_alert_history(
&self,
start: Option<&str>,
end: Option<&str>,
rule: Option<&str>,
limit: Option<usize>,
) -> Result<Vec<AlertHistoryEntry>> {
Ok(vec![AlertHistoryEntry {
name: "HighResponseTime".to_string(),
state: "resolved".to_string(),
timestamp: "2023-11-01T11:00:00Z".to_string(),
duration: "10m".to_string(),
}])
}
pub fn acknowledge_alert(
&self,
alert: &str,
comment: Option<&str>,
expires: Option<&str>,
) -> Result<()> {
info!("Acknowledging alert: {}", alert);
Ok(())
}
pub fn export_metrics(
&self,
output: &PathBuf,
start: Option<&str>,
end: Option<&str>,
metrics: &[String],
format: &crate::cli::advanced_monitoring::ExportFormat,
compress: bool,
) -> Result<()> {
info!("Exporting metrics to: {}", output.display());
Ok(())
}
pub fn export_alerts(
&self,
output: &PathBuf,
start: Option<&str>,
end: Option<&str>,
format: &crate::cli::advanced_monitoring::ExportFormat,
) -> Result<()> {
info!("Exporting alerts to: {}", output.display());
Ok(())
}
pub fn export_configuration(
&self,
output: &PathBuf,
include_secrets: bool,
format: &crate::cli::advanced_monitoring::ExportFormat,
) -> Result<()> {
info!("Exporting configuration to: {}", output.display());
Ok(())
}
pub fn export_dashboards(
&self,
output: &PathBuf,
dashboards: &[String],
format: &crate::cli::advanced_monitoring::ExportFormat,
) -> Result<()> {
info!("Exporting dashboards to: {}", output.display());
Ok(())
}
pub async fn comprehensive_health_check(&self) -> Result<HealthCheckResult> {
Ok(HealthCheckResult {
healthy: true,
timestamp: Utc::now(),
components: HashMap::from([
(
"prometheus".to_string(),
ComponentStatus {
healthy: true,
message: "OK".to_string(),
response_time: Some(25),
},
),
(
"alertmanager".to_string(),
ComponentStatus {
healthy: true,
message: "OK".to_string(),
response_time: Some(30),
},
),
]),
memory_usage: Some(512),
cpu_usage: Some(45.2),
disk_usage: Some(68.5),
network_latency: Some(12),
})
}
pub async fn component_health_check(&self, component: &str) -> Result<HealthCheckResult> {
Ok(HealthCheckResult {
healthy: true,
timestamp: Utc::now(),
components: HashMap::from([(
component.to_string(),
ComponentStatus {
healthy: true,
message: "OK".to_string(),
response_time: Some(20),
},
)]),
memory_usage: None,
cpu_usage: None,
disk_usage: None,
network_latency: None,
})
}
pub async fn basic_health_check(&self) -> Result<HealthCheckResult> {
Ok(HealthCheckResult {
healthy: true,
timestamp: Utc::now(),
components: HashMap::new(),
memory_usage: None,
cpu_usage: None,
disk_usage: None,
network_latency: None,
})
}
pub async fn get_retention_policies(&self) -> Result<Vec<RetentionPolicy>> {
Ok(vec![
RetentionPolicy {
data_type: "metrics".to_string(),
retention_period: "30d".to_string(),
auto_cleanup: true,
last_cleanup: Some("2023-11-01T00:00:00Z".to_string()),
current_size: Some(1024),
},
RetentionPolicy {
data_type: "alerts".to_string(),
retention_period: "7d".to_string(),
auto_cleanup: true,
last_cleanup: Some("2023-11-01T00:00:00Z".to_string()),
current_size: Some(256),
},
])
}
pub fn update_retention_policies(
&self,
metrics: Option<&str>,
alerts: Option<&str>,
logs: Option<&str>,
auto_cleanup: Option<bool>,
) -> Result<()> {
info!("Updating retention policies");
Ok(())
}
pub fn preview_cleanup(
&self,
cleanup_type: Option<&crate::cli::advanced_monitoring::CleanupType>,
older_than: Option<&str>,
) -> Result<Vec<CleanupPreviewItem>> {
Ok(vec![CleanupPreviewItem {
path: "/data/metrics/old_data.db".to_string(),
size_mb: 256,
age: "45d".to_string(),
}])
}
pub fn perform_cleanup(
&self,
cleanup_type: Option<&crate::cli::advanced_monitoring::CleanupType>,
older_than: Option<&str>,
) -> Result<CleanupResult> {
Ok(CleanupResult {
deleted_count: 5,
freed_space_mb: 1024,
duration: Duration::from_secs(30),
})
}
pub fn compact_data(
&self,
level: Option<u32>,
start: Option<&str>,
end: Option<&str>,
) -> Result<CompactionResult> {
Ok(CompactionResult {
processed_blocks: 100,
space_saved_mb: 512,
duration: Duration::from_secs(300),
})
}
pub async fn test_component_config(
&self,
_component: &str,
_config_file: Option<&std::path::Path>,
) -> Result<TestResult> {
Ok(TestResult {
success: true,
error: None,
response_time: None,
delivery_time: None,
warnings: vec![],
rules_count: 0,
triggered_alerts: None,
})
}
pub async fn test_full_config(
&self,
_config_file: Option<&std::path::Path>,
) -> Result<TestResult> {
Ok(TestResult {
success: true,
error: None,
response_time: None,
delivery_time: None,
warnings: vec!["Configuration uses default values".to_string()],
rules_count: 0,
triggered_alerts: None,
})
}
pub async fn test_prometheus_connectivity(&self) -> Result<()> {
info!("Testing Prometheus connectivity");
Ok(())
}
pub async fn test_alertmanager_connectivity(&self) -> Result<()> {
info!("Testing Alertmanager connectivity");
Ok(())
}
pub async fn test_grafana_connectivity(&self) -> Result<()> {
info!("Testing Grafana connectivity");
Ok(())
}
pub fn test_alert_rules_file(
&self,
file: &std::path::Path,
data: Option<&std::path::Path>,
) -> Result<TestResult> {
Ok(TestResult {
success: true,
error: None,
response_time: None,
delivery_time: None,
warnings: vec![],
rules_count: 5,
triggered_alerts: None,
})
}
pub fn test_notification_channel(
&self,
receiver: &str,
message: Option<&str>,
) -> Result<TestResult> {
Ok(TestResult {
success: true,
error: None,
response_time: None,
delivery_time: Some(250),
warnings: vec![],
rules_count: 0,
triggered_alerts: None,
})
}
pub fn run_load_test(
&self,
concurrency: u32,
duration: u64,
rate: f64,
) -> Result<LoadTestResult> {
Ok(LoadTestResult {
total_requests: (duration * rate as u64),
successful_requests: (duration * rate as u64 * 95 / 100),
failed_requests: (duration * rate as u64 * 5 / 100),
avg_response_time: 150.0,
p95_response_time: 300.0,
p99_response_time: 500.0,
throughput: rate * 0.95,
})
}
pub async fn list_recording_rules(
&self,
_group: Option<&str>,
) -> Result<Vec<RecordingRuleInfo>> {
Ok(vec![RecordingRuleInfo {
name: "inferno:response_time_p95".to_string(),
group: "inferno.rules".to_string(),
interval: 30,
expression: "histogram_quantile(0.95, response_time_bucket)".to_string(),
}])
}
pub async fn add_recording_rule(&self, file: &PathBuf, _group: Option<&str>) -> Result<String> {
info!("Adding recording rule from: {}", file.display());
Ok("recording-rule-123".to_string())
}
pub async fn remove_recording_rule(&self, name: &str, _group: Option<&str>) -> Result<()> {
info!("Removing recording rule: {}", name);
Ok(())
}
pub fn test_recording_rule(
&self,
file: &PathBuf,
duration: Option<&str>,
) -> Result<TestResult> {
Ok(TestResult {
success: true,
error: None,
response_time: None,
delivery_time: None,
warnings: vec![],
rules_count: 1,
triggered_alerts: None,
})
}
pub async fn list_remote_write_endpoints(&self) -> Result<Vec<RemoteWriteEndpoint>> {
Ok(vec![RemoteWriteEndpoint {
name: "remote-storage".to_string(),
url: "https://remote.example.com/write".to_string(),
status: "active".to_string(),
}])
}
pub fn add_remote_write_endpoint(
&self,
url: &str,
name: Option<&str>,
auth: Option<&str>,
queue_config: Option<&std::path::Path>,
) -> Result<String> {
info!("Adding remote write endpoint: {}", url);
Ok("endpoint-123".to_string())
}
pub async fn remove_remote_write_endpoint(&self, endpoint: &str) -> Result<()> {
info!("Removing remote write endpoint: {}", endpoint);
Ok(())
}
pub fn test_remote_write_endpoint(&self, endpoint: &str, timeout: u64) -> Result<TestResult> {
Ok(TestResult {
success: true,
error: None,
response_time: Some(100),
delivery_time: None,
warnings: vec![],
rules_count: 0,
triggered_alerts: None,
})
}
pub async fn list_silences(&self, _expired: bool) -> Result<Vec<SilenceInfo>> {
Ok(vec![SilenceInfo {
id: "silence-123".to_string(),
matcher: "alertname=HighResponseTime".to_string(),
expires_at: "2023-11-02T12:00:00Z".to_string(),
created_by: "admin".to_string(),
comment: "Maintenance window".to_string(),
}])
}
pub fn create_silence(
&self,
matcher: &str,
duration: &str,
comment: Option<&str>,
created_by: Option<&str>,
) -> Result<String> {
info!("Creating silence for: {}", matcher);
Ok("silence-456".to_string())
}
pub async fn remove_silence(&self, id: &str) -> Result<()> {
info!("Removing silence: {}", id);
Ok(())
}
pub async fn extend_silence(&self, id: &str, duration: &str) -> Result<()> {
info!("Extending silence {} by {}", id, duration);
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthStatus {
pub collector_healthy: bool,
pub alertmanager_healthy: bool,
pub dashboards_healthy: bool,
pub export_healthy: bool,
pub prometheus_healthy: bool,
}
pub struct MetricsCollector {
config: MetricsCollectionConfig,
prometheus_client: Arc<PrometheusClient>,
custom_metrics: Arc<RwLock<HashMap<String, CustomMetricDefinition>>>,
running: Arc<RwLock<bool>>,
}
impl MetricsCollector {
pub fn new(
config: &MetricsCollectionConfig,
prometheus_client: Arc<PrometheusClient>,
) -> Result<Self> {
Ok(Self {
config: config.clone(),
prometheus_client,
custom_metrics: Arc::new(RwLock::new(HashMap::new())),
running: Arc::new(RwLock::new(false)),
})
}
pub async fn start(&self) -> Result<()> {
info!("Starting metrics collector");
{
let mut running = self.running.write().await;
*running = true;
}
let config = self.config.clone();
let prometheus_client = Arc::clone(&self.prometheus_client);
let custom_metrics = Arc::clone(&self.custom_metrics);
let running = Arc::clone(&self.running);
tokio::spawn(async move {
let mut interval = tokio::time::interval(config.interval);
while *running.read().await {
interval.tick().await;
if let Err(e) =
Self::collect_metrics(&config, &prometheus_client, &custom_metrics).await
{
error!("Failed to collect metrics: {}", e);
}
}
});
Ok(())
}
pub async fn stop(&self) -> Result<()> {
info!("Stopping metrics collector");
{
let mut running = self.running.write().await;
*running = false;
}
Ok(())
}
pub async fn register_custom_metric(&self, metric: CustomMetricDefinition) -> Result<()> {
info!("Registering custom metric: {}", metric.name);
let mut custom_metrics = self.custom_metrics.write().await;
custom_metrics.insert(metric.name.clone(), metric);
Ok(())
}
pub async fn is_healthy(&self) -> bool {
*self.running.read().await
}
async fn collect_metrics<'a>(
_config: &'a MetricsCollectionConfig,
prometheus_client: &'a PrometheusClient,
custom_metrics: &'a Arc<RwLock<HashMap<String, CustomMetricDefinition>>>,
) -> Result<()> {
debug!("Collecting metrics");
let system_metrics = collect_system_metrics().await?;
let app_metrics = collect_application_metrics().await?;
let custom_metrics_data = collect_custom_metrics(custom_metrics).await?;
let mut all_metrics = system_metrics;
all_metrics.extend(app_metrics);
all_metrics.extend(custom_metrics_data);
prometheus_client.push_metrics(all_metrics).await?;
Ok(())
}
}
pub struct AlertManager {
config: AlertingConfig,
alert_rules: Arc<RwLock<Vec<AlertRule>>>,
active_alerts: Arc<RwLock<HashMap<String, Alert>>>,
notification_channels: Arc<RwLock<Vec<NotificationChannel>>>,
alert_sender: broadcast::Sender<Alert>,
}
impl AlertManager {
pub fn new(config: &AlertingConfig) -> Result<Self> {
let (alert_sender, _) = broadcast::channel(1000);
Ok(Self {
config: config.clone(),
alert_rules: Arc::new(RwLock::new(config.rules.clone())),
active_alerts: Arc::new(RwLock::new(HashMap::new())),
notification_channels: Arc::new(RwLock::new(config.channels.clone())),
alert_sender,
})
}
pub async fn start(&self) -> Result<()> {
info!("Starting alert manager");
let alert_rules = Arc::clone(&self.alert_rules);
let active_alerts = Arc::clone(&self.active_alerts);
let alert_sender = self.alert_sender.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(15));
loop {
interval.tick().await;
if let Err(e) =
Self::evaluate_alerts(&alert_rules, &active_alerts, &alert_sender).await
{
error!("Failed to evaluate alerts: {}", e);
}
}
});
let notification_channels = Arc::clone(&self.notification_channels);
let mut alert_receiver = self.alert_sender.subscribe();
tokio::spawn(async move {
while let Ok(alert) = alert_receiver.recv().await {
if let Err(e) =
Self::handle_alert_notification(&alert, ¬ification_channels).await
{
error!("Failed to send alert notification: {}", e);
}
}
});
Ok(())
}
pub async fn stop(&self) -> Result<()> {
info!("Stopping alert manager");
Ok(())
}
pub async fn send_alert(&self, alert: Alert) -> Result<()> {
info!("Sending alert: {}", alert.name);
{
let mut active_alerts = self.active_alerts.write().await;
active_alerts.insert(alert.name.clone(), alert.clone());
}
let _ = self.alert_sender.send(alert);
Ok(())
}
pub async fn is_healthy(&self) -> bool {
true }
async fn evaluate_alerts(
alert_rules: &Arc<RwLock<Vec<AlertRule>>>,
active_alerts: &Arc<RwLock<HashMap<String, Alert>>>,
alert_sender: &broadcast::Sender<Alert>,
) -> Result<()> {
let rules = alert_rules.read().await;
for rule in rules.iter() {
let should_fire = evaluate_alert_rule(rule).await?;
if should_fire {
let alert = Alert {
name: rule.alert.clone(),
severity: rule.severity.clone(),
message: format!("Alert {} is firing", rule.alert),
labels: rule.labels.clone(),
annotations: rule.annotations.clone(),
starts_at: Utc::now(),
ends_at: None,
generator_url: None,
};
let _ = alert_sender.send(alert);
}
}
Ok(())
}
async fn handle_alert_notification(
alert: &Alert,
channels: &Arc<RwLock<Vec<NotificationChannel>>>,
) -> Result<()> {
let channels = channels.read().await;
for channel in channels.iter() {
if channel.enabled {
if let Err(e) = send_notification(channel, alert).await {
error!("Failed to send notification via {}: {}", channel.name, e);
}
}
}
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Alert {
pub name: String,
pub severity: AlertSeverity,
pub message: String,
pub labels: HashMap<String, String>,
pub annotations: HashMap<String, String>,
pub starts_at: DateTime<Utc>,
pub ends_at: Option<DateTime<Utc>>,
pub generator_url: Option<String>,
}
pub struct DashboardManager {
config: DashboardsConfig,
dashboards: Arc<RwLock<HashMap<String, DashboardDefinition>>>,
}
impl DashboardManager {
pub fn new(config: &DashboardsConfig) -> Result<Self> {
Ok(Self {
config: config.clone(),
dashboards: Arc::new(RwLock::new(HashMap::new())),
})
}
pub async fn start(&self) -> Result<()> {
info!("Starting dashboard manager");
for dashboard in &self.config.dashboards {
self.register_dashboard(dashboard.clone()).await?;
}
if self.config.auto_import.enabled {
let config = self.config.auto_import.clone();
let dashboards = Arc::clone(&self.dashboards);
tokio::spawn(async move {
if config.watch {
Self::watch_dashboard_directory(&config, &dashboards).await;
} else {
let mut interval = tokio::time::interval(config.interval);
loop {
interval.tick().await;
if let Err(e) = Self::import_dashboards(&config, &dashboards).await {
error!("Failed to import dashboards: {}", e);
}
}
}
});
}
Ok(())
}
pub async fn stop(&self) -> Result<()> {
info!("Stopping dashboard manager");
Ok(())
}
pub async fn register_dashboard(&self, dashboard: DashboardDefinition) -> Result<()> {
info!("Registering dashboard: {}", dashboard.name);
let mut dashboards = self.dashboards.write().await;
dashboards.insert(dashboard.name.clone(), dashboard);
Ok(())
}
pub async fn is_healthy(&self) -> bool {
true }
async fn watch_dashboard_directory(
config: &AutoImportConfig,
dashboards: &Arc<RwLock<HashMap<String, DashboardDefinition>>>,
) {
info!(
"Watching dashboard directory: {}",
config.directory.display()
);
}
async fn import_dashboards(
config: &AutoImportConfig,
dashboards: &Arc<RwLock<HashMap<String, DashboardDefinition>>>,
) -> Result<()> {
debug!("Importing dashboards from: {}", config.directory.display());
Ok(())
}
}
pub struct ExportManager {
config: ExportConfig,
exporters: Vec<Arc<dyn MetricsExporter>>,
}
impl ExportManager {
pub fn new(config: &ExportConfig) -> Result<Self> {
let mut exporters: Vec<Arc<dyn MetricsExporter>> = Vec::new();
for target in &config.targets {
let exporter = create_exporter(target)?;
exporters.push(exporter);
}
Ok(Self {
config: config.clone(),
exporters,
})
}
pub async fn start(&self) -> Result<()> {
info!("Starting export manager");
for exporter in &self.exporters {
exporter.start().await?;
}
Ok(())
}
pub async fn stop(&self) -> Result<()> {
info!("Stopping export manager");
for exporter in &self.exporters {
exporter.stop().await?;
}
Ok(())
}
pub async fn is_healthy(&self) -> bool {
for exporter in &self.exporters {
if !exporter.is_healthy().await {
return false;
}
}
true
}
}
#[async_trait::async_trait]
pub trait MetricsExporter: Send + Sync {
async fn start(&self) -> Result<()>;
async fn stop(&self) -> Result<()>;
async fn export(&self, metrics: Vec<Metric>) -> Result<()>;
async fn is_healthy(&self) -> bool;
}
pub struct PrometheusClient {
config: PrometheusConfig,
#[cfg(feature = "reqwest")]
client: reqwest::Client,
}
impl PrometheusClient {
pub fn new(config: &PrometheusConfig) -> Result<Self> {
#[cfg(feature = "reqwest")]
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {}", e))?;
Ok(Self {
config: config.clone(),
#[cfg(feature = "reqwest")]
client,
})
}
#[cfg(feature = "reqwest")]
pub async fn query(&self, query: &str) -> Result<MetricQueryResult> {
let url = format!("{}/api/v1/query", self.config.endpoint);
let response = self
.client
.get(&url)
.query(&[("query", query)])
.send()
.await?;
let result: serde_json::Value = response.json().await?;
Ok(MetricQueryResult {
status: "success".to_string(),
data: result,
})
}
#[cfg(not(feature = "reqwest"))]
pub async fn query(&self, _query: &str) -> Result<MetricQueryResult> {
Err(anyhow::anyhow!(
"HTTP client support not enabled. Compile with --features reqwest"
))
}
#[cfg(feature = "reqwest")]
pub async fn query_range(
&self,
query: &str,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Result<MetricQueryResult> {
let url = format!("{}/api/v1/query_range", self.config.endpoint);
let response = self
.client
.get(&url)
.query(&[
("query", query),
("start", &start.timestamp().to_string()),
("end", &end.timestamp().to_string()),
("step", "15s"),
])
.send()
.await?;
let result: serde_json::Value = response.json().await?;
Ok(MetricQueryResult {
status: "success".to_string(),
data: result,
})
}
#[cfg(not(feature = "reqwest"))]
pub async fn query_range(
&self,
_query: &str,
_start: DateTime<Utc>,
_end: DateTime<Utc>,
) -> Result<MetricQueryResult> {
Err(anyhow::anyhow!(
"HTTP client support not enabled. Compile with --features reqwest"
))
}
#[cfg(feature = "reqwest")]
pub async fn push_metrics(&self, metrics: Vec<Metric>) -> Result<()> {
if let Some(push_gateway) = &self.config.push_gateway {
let url = format!("{}/metrics/job/inferno", push_gateway);
let metrics_text = format_metrics_for_prometheus(metrics);
self.client
.post(&url)
.header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
.body(metrics_text)
.send()
.await?;
}
Ok(())
}
#[cfg(not(feature = "reqwest"))]
pub async fn push_metrics(&self, _metrics: Vec<Metric>) -> Result<()> {
warn!("HTTP client support not enabled - metrics push skipped");
Ok(())
}
#[cfg(feature = "reqwest")]
pub async fn is_healthy(&self) -> bool {
let url = format!("{}/api/v1/query", self.config.endpoint);
match self.client.get(&url).query(&[("query", "up")]).send().await {
Ok(response) => response.status().is_success(),
Err(_) => false,
}
}
#[cfg(not(feature = "reqwest"))]
pub async fn is_healthy(&self) -> bool {
warn!("HTTP client support not enabled - health check skipped");
false
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricQueryResult {
pub status: String,
pub data: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Metric {
pub name: String,
pub value: f64,
pub labels: HashMap<String, String>,
pub timestamp: DateTime<Utc>,
pub metric_type: MetricType,
}
async fn collect_system_metrics() -> Result<Vec<Metric>> {
Ok(vec![
Metric {
name: "cpu_usage_percent".to_string(),
value: 45.2,
labels: HashMap::new(),
timestamp: Utc::now(),
metric_type: MetricType::Gauge,
},
Metric {
name: "memory_usage_bytes".to_string(),
value: 2_147_483_648.0, labels: HashMap::new(),
timestamp: Utc::now(),
metric_type: MetricType::Gauge,
},
])
}
async fn collect_application_metrics() -> Result<Vec<Metric>> {
Ok(vec![Metric {
name: "inference_requests_total".to_string(),
value: 1250.0,
labels: HashMap::from([("model".to_string(), "llama-7b".to_string())]),
timestamp: Utc::now(),
metric_type: MetricType::Counter,
}])
}
async fn collect_custom_metrics(
_custom_metrics: &Arc<RwLock<HashMap<String, CustomMetricDefinition>>>,
) -> Result<Vec<Metric>> {
Ok(vec![])
}
async fn evaluate_alert_rule(_rule: &AlertRule) -> Result<bool> {
Ok(false)
}
async fn send_notification(channel: &NotificationChannel, alert: &Alert) -> Result<()> {
match channel.channel_type {
ChannelType::Email => {
info!("Sending email notification for alert: {}", alert.name);
}
ChannelType::Slack => {
info!("Sending Slack notification for alert: {}", alert.name);
}
ChannelType::Webhook => {
info!("Sending webhook notification for alert: {}", alert.name);
}
_ => {
info!(
"Sending {:?} notification for alert: {}",
channel.channel_type, alert.name
);
}
}
Ok(())
}
fn create_exporter(target: &ExportTarget) -> Result<Arc<dyn MetricsExporter>> {
match target.target_type {
ExportTargetType::File => Ok(Arc::new(FileExporter::new(&target.config)?)),
ExportTargetType::Http => Ok(Arc::new(HttpExporter::new(&target.config)?)),
_ => Err(anyhow::anyhow!(
"Unsupported export target type: {:?}",
target.target_type
)),
}
}
fn format_metrics_for_prometheus(metrics: Vec<Metric>) -> String {
let mut output = String::new();
for metric in metrics {
let labels = if metric.labels.is_empty() {
String::new()
} else {
let label_pairs: Vec<String> = metric
.labels
.iter()
.map(|(k, v)| format!("{}=\"{}\"", k, v))
.collect();
format!("{{{}}}", label_pairs.join(","))
};
output.push_str(&format!(
"{}{} {} {}\n",
metric.name,
labels,
metric.value,
metric.timestamp.timestamp_millis()
));
}
output
}
struct FileExporter {
config: ExportTargetConfig,
}
impl FileExporter {
fn new(config: &ExportTargetConfig) -> Result<Self> {
Ok(Self {
config: config.clone(),
})
}
}
#[async_trait::async_trait]
impl MetricsExporter for FileExporter {
async fn start(&self) -> Result<()> {
Ok(())
}
async fn stop(&self) -> Result<()> {
Ok(())
}
async fn export(&self, _metrics: Vec<Metric>) -> Result<()> {
Ok(())
}
async fn is_healthy(&self) -> bool {
true
}
}
struct HttpExporter {
config: ExportTargetConfig,
}
impl HttpExporter {
fn new(config: &ExportTargetConfig) -> Result<Self> {
Ok(Self {
config: config.clone(),
})
}
}
#[async_trait::async_trait]
impl MetricsExporter for HttpExporter {
async fn start(&self) -> Result<()> {
Ok(())
}
async fn stop(&self) -> Result<()> {
Ok(())
}
async fn export(&self, _metrics: Vec<Metric>) -> Result<()> {
Ok(())
}
async fn is_healthy(&self) -> bool {
true
}
}
impl From<crate::monitoring::MonitoringConfig> for AdvancedMonitoringConfig {
fn from(config: crate::monitoring::MonitoringConfig) -> Self {
Self {
enabled: config.enabled,
prometheus: PrometheusConfig {
endpoint: config.prometheus.global.external_url,
push_gateway: None,
scrape_interval: config.prometheus.global.scrape_interval_seconds,
evaluation_interval: config.prometheus.global.scrape_interval_seconds,
remote_write: vec![],
service_discovery: ServiceDiscoveryConfig::default(),
recording_rules: vec![],
federation: FederationConfig::default(),
},
alerting: AlertingConfig {
alertmanager: AlertmanagerConfig::default(),
rules: vec![],
routing: RoutingConfig::default(),
channels: vec![],
inhibition: vec![],
silences: vec![],
},
collection: MetricsCollectionConfig {
interval: Duration::from_millis(config.collection_interval_ms),
interval_seconds: config.collection_interval_ms / 1000,
buffer_size: 1000,
batch_size: 100,
timeout: Duration::from_secs(30),
timeout_seconds: 30,
retry: RetryConfig::default(),
global_labels: HashMap::new(),
metrics: vec![],
},
dashboards: DashboardsConfig {
grafana: GrafanaConfig::default(),
dashboards: vec![],
auto_import: AutoImportConfig {
enabled: false,
directory: PathBuf::from("./dashboards"),
watch: false,
interval: Duration::from_secs(300),
},
},
retention: RetentionConfig {
default_retention: Duration::from_secs(config.metric_retention_hours * 3600),
per_metric_retention: HashMap::new(),
downsampling: vec![],
compaction: CompactionConfig::default(),
},
export: ExportConfig {
formats: vec![ExportFormat::Prometheus],
targets: vec![],
schedule: None,
},
custom_metrics: vec![],
}
}
}