mod alerts;
mod circuit_breaker;
mod config;
mod detect;
mod error;
mod ingestion;
mod metrics;
mod model;
mod retention;
mod store;
mod web;
use detect::temporal;
use metrics::AppMetrics;
use retention::{RetentionConfig, RetentionService};
use clap::Parser;
use config::AppConfig;
use std::sync::Arc;
use tokio::signal;
use tracing::{error, info, Level};
use tracing_subscriber::FmtSubscriber;
#[derive(Parser)]
#[command(name = "adsb-anomaly")]
#[command(version = "0.1.0")]
#[command(about = "ADS-B Aircraft Anomaly Detection System")]
struct Args {
#[arg(long, help = "Set log level")]
log_level: Option<String>,
#[arg(long, help = "Path to configuration file")]
config: Option<String>,
#[arg(long, help = "Database file path")]
db_path: Option<String>,
#[arg(long, help = "Web server port")]
port: Option<u16>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
init_tracing(&args.log_level)?;
let mut app_config = AppConfig::load(args.config.as_deref())?;
app_config.merge_cli_overrides(args.config, args.db_path, args.port);
info!(
"Starting ADS-B Anomaly Detection System v{}",
env!("CARGO_PKG_VERSION")
);
info!("Configuration loaded successfully");
info!("Database path: {}", app_config.database.path);
info!("Web server will run on port: {}", app_config.web.port);
info!("PiAware URL: {}", app_config.adsb.piaware_url);
AppMetrics::init()?;
info!("Metrics initialized");
let pool =
store::connect_and_migrate(&app_config.database.path, app_config.database.wal_mode).await?;
info!("Database connection established and migrations completed");
let (temporal_sender, _temporal_receiver) = tokio::sync::mpsc::unbounded_channel();
let (signal_sender, _signal_receiver) = tokio::sync::mpsc::unbounded_channel();
let (identity_sender, _identity_receiver) = tokio::sync::mpsc::unbounded_channel();
let (behavior_sender, _behavior_receiver) = tokio::sync::mpsc::unbounded_channel();
let config = Arc::new(app_config.analysis.clone());
let _temporal_service = temporal::TemporalDetectionService::new(
config.clone(),
temporal_sender,
Some(10), );
let _signal_service =
detect::signal::SignalDetectionService::new(config.clone(), signal_sender);
let _identity_service =
detect::identity::IdentityDetectionService::new(config.clone(), identity_sender);
let _behavior_service =
detect::behavior::BehaviorDetectionService::new(config, behavior_sender);
let _alert_manager = Arc::new(tokio::sync::Mutex::new(alerts::AlertManager::new(
pool.clone(),
)));
let ingestion_pool = pool.clone();
let ingestion_url = app_config.adsb.piaware_url.clone();
let poll_interval = app_config.adsb.poll_interval_ms;
let circuit_breaker_config = app_config.adsb.circuit_breaker.clone().into();
let ingestion_handle = tokio::spawn(async move {
info!("Starting ingestion service with circuit breaker protection");
if let Err(e) = ingestion::service::run_ingestion_with_circuit_breaker(
ingestion_pool,
ingestion_url,
poll_interval,
circuit_breaker_config,
)
.await
{
error!("Ingestion service failed: {}", e);
}
});
let web_pool = pool.clone();
let web_config = app_config.web.clone();
let analysis_config = app_config.analysis.clone();
let web_handle = tokio::spawn(async move {
info!("Starting web server");
if let Err(e) = web::serve(web_pool, web_config, analysis_config).await {
error!("Web server failed: {}", e);
}
});
let retention_pool = pool.clone();
let retention_config = RetentionConfig::default(); let retention_handle = tokio::spawn(async move {
let retention_service = RetentionService::new(retention_pool, retention_config);
info!("Starting retention service");
retention_service.run().await;
});
info!("System ready - press Ctrl+C to shutdown");
signal::ctrl_c().await?;
info!("Shutdown signal received, stopping services...");
ingestion_handle.abort();
web_handle.abort();
retention_handle.abort();
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
info!("Shutdown complete");
Ok(())
}
fn init_tracing(log_level: &Option<String>) -> Result<(), Box<dyn std::error::Error>> {
let level = match log_level
.as_deref()
.or(std::env::var("RUST_LOG").ok().as_deref())
{
Some("debug") => Level::DEBUG,
Some("warn") => Level::WARN,
Some("error") => Level::ERROR,
Some("trace") => Level::TRACE,
_ => Level::INFO,
};
let subscriber = FmtSubscriber::builder().with_max_level(level).finish();
tracing::subscriber::set_global_default(subscriber)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use clap::CommandFactory;
#[test]
fn test_cli_parsing() {
let cmd = Args::command();
cmd.debug_assert();
}
#[test]
fn test_tracing_init() {
let result = init_tracing(&Some("info".to_string()));
assert!(result.is_ok() || result.is_err()); }
}