use clap::Parser;
use oculus::{
collector::CollectorRegistry,
collector::http::HttpCollector,
collector::ping::PingCollector,
collector::tcp::TcpCollector,
config::{AppConfig, CollectorsConfig},
server::{AppState, create_router},
storage::{
CollectorRecord, CollectorType, Event, EventKind, EventSeverity, EventSource,
StorageBuilder,
},
};
use std::net::SocketAddr;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[derive(Parser, Debug)]
#[command(name = "oculus", version, about, long_about = None)]
struct Cli {
#[arg(
short,
long,
default_value = "configs/config.yaml",
env = "OCULUS_CONFIG"
)]
config: String,
#[arg(long, env = "OCULUS_SERVER_BIND")]
server_bind: Option<String>,
#[arg(long, env = "OCULUS_SERVER_PORT")]
server_port: Option<u16>,
#[arg(long, env = "OCULUS_DB_URL")]
db_url: Option<String>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info,oculus=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
tracing::info!("Oculus - Unified Telemetry System");
let cli = Cli::parse();
tracing::info!("Loading configuration from: {}", cli.config);
let mut config = AppConfig::load(&cli.config)?;
if let Some(bind) = cli.server_bind {
config.server.bind = bind;
}
if let Some(port) = cli.server_port {
config.server.port = port;
}
if let Some(dsn) = cli.db_url {
config.database.dsn = dsn;
}
tracing::info!(
"Server: {}:{}, Database: {} ({})",
config.server.bind,
config.server.port,
config.database.dsn,
config.database.driver,
);
let db_url = config.database.connection_url();
tracing::info!("Initializing storage at: {}", db_url);
let handles = StorageBuilder::new(&db_url)
.channel_capacity(config.database.channel_capacity)
.build()
.await?;
tracing::info!("Storage initialized");
if let Some(ref include_path) = config.collector_include {
tracing::info!("Loading collectors from: {}", include_path);
let collectors_config = CollectorsConfig::load_from_dir(include_path)?;
collectors_config.validate()?;
let records = collectors_config.to_collector_records();
let mut inserted = 0;
let mut skipped = 0;
for record in &records {
match handles.collector_store.insert_if_not_exists(record).await? {
Some(id) => {
tracing::info!(
"Synced collector: {} ({}, id={})",
record.name,
record.collector_type.as_ref(),
id
);
inserted += 1;
}
None => {
tracing::debug!(
"Collector already exists, skipping: {} ({})",
record.name,
record.collector_type.as_ref()
);
skipped += 1;
}
}
}
tracing::info!(
"Collector sync complete: {} inserted, {} skipped",
inserted,
skipped
);
}
tracing::info!("Starting collector registry...");
let registry = CollectorRegistry::new(handles.writer.clone()).await?;
registry.start().await?;
if let Err(e) = handles.writer.insert_event(Event::new(
EventSource::System,
EventKind::System,
EventSeverity::Info,
"Service started",
)) {
tracing::warn!("Failed to emit service start event: {}", e);
}
let db_collectors = handles.collector_store.list_all().await?;
tracing::info!("Found {} collectors in database", db_collectors.len());
for record in db_collectors {
if !record.enabled {
tracing::debug!(
"Skipping disabled collector: {} ({})",
record.name,
record.collector_type.as_ref()
);
continue;
}
match spawn_collector(&record, &handles.writer, ®istry).await {
Ok(()) => {
tracing::info!(
"Spawned collector: {} ({})",
record.name,
record.collector_type.as_ref()
);
}
Err(e) => {
tracing::error!("Failed to spawn collector '{}': {}", record.name, e);
}
}
}
let app_state = AppState {
metric_reader: handles.metric_reader.clone(),
event_reader: handles.event_reader.clone(),
collector_store: handles.collector_store.clone(),
};
let app = create_router(app_state);
let addr: SocketAddr = format!("{}:{}", config.server.bind, config.server.port).parse()?;
tracing::info!("Web server listening on: http://{}", addr);
tracing::info!("Press Ctrl+C to shutdown");
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal(registry, handles))
.await?;
tracing::info!("Shutdown complete");
Ok(())
}
async fn spawn_collector(
record: &CollectorRecord,
writer: &oculus::StorageWriter,
registry: &CollectorRegistry,
) -> Result<(), Box<dyn std::error::Error>> {
match record.collector_type {
CollectorType::Tcp => {
let config: oculus::TcpConfig = serde_json::from_value(record.config.clone())?;
let collector = TcpCollector::new(config, writer.clone());
registry.spawn(collector).await?;
}
CollectorType::Ping => {
let config: oculus::PingConfig = serde_json::from_value(record.config.clone())?;
let collector = PingCollector::new(config, writer.clone());
registry.spawn(collector).await?;
}
CollectorType::Http => {
let config: oculus::collector::http::HttpConfig =
serde_json::from_value(record.config.clone())?;
let collector = HttpCollector::new(config, writer.clone())?;
registry.spawn(collector).await?;
}
}
Ok(())
}
async fn shutdown_signal(registry: CollectorRegistry, handles: oculus::StorageHandles) {
let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {
tracing::info!("Received Ctrl+C signal");
}
_ = terminate => {
tracing::info!("Received terminate signal");
}
}
if let Err(e) = handles.writer.insert_event(Event::new(
EventSource::System,
EventKind::System,
EventSeverity::Info,
"Service stopping",
)) {
tracing::warn!("Failed to emit service stop event: {}", e);
}
tracing::info!("Shutting down collectors...");
if let Err(e) = registry.shutdown().await {
tracing::error!("Failed to shutdown collectors: {}", e);
}
tracing::info!("Shutting down storage...");
if let Err(e) = handles.shutdown().await {
tracing::error!("Failed to shutdown storage: {}", e);
}
}