mod alerts;
mod collector;
mod config;
mod partitions;
mod storage;
mod tracing_layer;
pub use alerts::{
Alert, AlertCondition, AlertEvaluator, AlertRule, AlertSeverity, AlertStatus, AlertStore,
};
pub use collector::{
LogCollector, MetricsCollector, SystemMetricsCollector, SystemMetricsSnapshot, TraceCollector,
};
pub use config::{LogsConfig, MetricsConfig, ObservabilityConfig, TracesConfig};
pub use partitions::{PartitionConfig, PartitionGranularity, PartitionManager};
pub use storage::{LogStore, MetricsStore, TraceStore, TraceSummary};
pub use tracing_layer::ForgeTracingLayer;
use std::sync::Arc;
use std::time::Duration;
use forge_core::observability::{LogEntry, Metric, Span};
use forge_core::Result;
use tokio::sync::RwLock;
#[derive(Clone)]
pub struct ObservabilityState {
pub metrics_collector: Arc<MetricsCollector>,
pub log_collector: Arc<LogCollector>,
pub trace_collector: Arc<TraceCollector>,
pub system_metrics: Arc<SystemMetricsCollector>,
pub metrics_store: Arc<MetricsStore>,
pub log_store: Arc<LogStore>,
pub trace_store: Arc<TraceStore>,
pub alert_store: Arc<AlertStore>,
config: ObservabilityConfig,
enabled: bool,
shutdown: Arc<RwLock<bool>>,
}
impl ObservabilityState {
pub fn new(config: ObservabilityConfig, pool: sqlx::PgPool) -> Self {
let enabled = config.enabled;
let metrics_collector = Arc::new(MetricsCollector::new(config.metrics.clone()));
let log_collector = Arc::new(LogCollector::new(config.logs.clone()));
let trace_collector = Arc::new(TraceCollector::new(config.traces.clone()));
let system_metrics = Arc::new(SystemMetricsCollector::new());
let metrics_store = Arc::new(MetricsStore::new(pool.clone()));
let log_store = Arc::new(LogStore::new(pool.clone()));
let trace_store = Arc::new(TraceStore::new(pool.clone()));
let alert_store = Arc::new(AlertStore::new(pool));
Self {
metrics_collector,
log_collector,
trace_collector,
system_metrics,
metrics_store,
log_store,
trace_store,
alert_store,
config,
enabled,
shutdown: Arc::new(RwLock::new(false)),
}
}
pub fn is_enabled(&self) -> bool {
self.enabled
}
pub async fn record_metric(&self, metric: Metric) {
if self.enabled {
self.metrics_collector.record(metric).await;
}
}
pub async fn increment_counter(&self, name: impl Into<String>, value: f64) {
if self.enabled {
self.metrics_collector.increment_counter(name, value).await;
}
}
pub async fn set_gauge(&self, name: impl Into<String>, value: f64) {
if self.enabled {
self.metrics_collector.set_gauge(name, value).await;
}
}
pub async fn record_log(&self, log: LogEntry) {
if self.enabled {
self.log_collector.record(log).await;
}
}
pub async fn info(&self, message: impl Into<String>) {
if self.enabled {
self.log_collector.info(message).await;
}
}
pub async fn warn(&self, message: impl Into<String>) {
if self.enabled {
self.log_collector.warn(message).await;
}
}
pub async fn error(&self, message: impl Into<String>) {
if self.enabled {
self.log_collector.error(message).await;
}
}
pub async fn record_span(&self, span: Span) {
if self.enabled {
self.trace_collector.record(span).await;
}
}
pub async fn flush(&self) -> Result<()> {
if !self.enabled {
return Ok(());
}
self.metrics_collector.flush().await;
self.log_collector.flush().await;
self.trace_collector.flush().await;
Ok(())
}
pub fn start_background_tasks(&self) -> Vec<tokio::task::JoinHandle<()>> {
let mut handles = Vec::new();
if !self.enabled {
return handles;
}
{
let collector = self.metrics_collector.clone();
let store = self.metrics_store.clone();
let interval = self.config.metrics.flush_interval;
let shutdown = self.shutdown.clone();
handles.push(tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
if *shutdown.read().await {
break;
}
let metrics = collector.drain().await;
if !metrics.is_empty() {
if let Err(e) = store.store(metrics).await {
tracing::warn!("Failed to persist metrics: {}", e);
}
}
}
}));
}
{
let collector = self.log_collector.clone();
let store = self.log_store.clone();
let shutdown = self.shutdown.clone();
let interval = Duration::from_secs(10);
handles.push(tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
if *shutdown.read().await {
break;
}
let logs = collector.drain().await;
if !logs.is_empty() {
if let Err(e) = store.store(logs).await {
tracing::warn!("Failed to persist logs: {}", e);
}
}
}
}));
}
{
let collector = self.trace_collector.clone();
let store = self.trace_store.clone();
let shutdown = self.shutdown.clone();
let interval = Duration::from_secs(10);
handles.push(tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
if *shutdown.read().await {
break;
}
let spans = collector.drain().await;
if !spans.is_empty() {
if let Err(e) = store.store(spans).await {
tracing::warn!("Failed to persist traces: {}", e);
}
}
}
}));
}
{
let handle = self
.system_metrics
.start(self.metrics_collector.clone(), Duration::from_secs(15));
handles.push(handle);
}
{
let metrics_store = self.metrics_store.clone();
let log_store = self.log_store.clone();
let trace_store = self.trace_store.clone();
let metrics_retention = self.config.metrics.raw_retention;
let logs_retention = self.config.logs.retention;
let traces_retention = self.config.traces.retention;
let shutdown = self.shutdown.clone();
handles.push(tokio::spawn(async move {
let mut ticker = tokio::time::interval(Duration::from_secs(3600)); loop {
ticker.tick().await;
if *shutdown.read().await {
break;
}
if let Err(e) = metrics_store.cleanup(metrics_retention).await {
tracing::warn!("Metrics cleanup error: {}", e);
}
if let Err(e) = log_store.cleanup(logs_retention).await {
tracing::warn!("Logs cleanup error: {}", e);
}
if let Err(e) = trace_store.cleanup(traces_retention).await {
tracing::warn!("Traces cleanup error: {}", e);
}
}
}));
}
handles
}
pub async fn shutdown(&self) {
let mut shutdown = self.shutdown.write().await;
*shutdown = true;
self.system_metrics.stop().await;
let _ = self.flush().await;
}
pub fn tracing_layer(&self) -> ForgeTracingLayer {
ForgeTracingLayer::new(self.log_collector.clone())
}
}