use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio::fs;
use tokio_cron_scheduler::Job;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ThreatIntelConfig {
pub auto_update_enabled: bool,
pub update_interval_seconds: u64,
pub feeds_directory: PathBuf,
pub feeds: HashMap<String, FeedConfig>,
pub download_timeout_seconds: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FeedConfig {
pub enabled: bool,
pub feed_type: FeedType,
pub url: String,
pub api_key: Option<String>,
pub headers: HashMap<String, String>,
pub filename: String,
pub format: FeedFormat,
pub custom_interval_seconds: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FeedType {
MaliciousIPs,
TorExitNodes,
VpnProxy,
BotnetC2,
CountryThreats,
HostingProviders,
DatacenterRanges,
Custom(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FeedFormat {
PlainText,
Csv,
Json,
Xml,
}
pub struct ThreatFeedManager {
config: ThreatIntelConfig,
client: Client,
scheduler: Option<tokio_cron_scheduler::JobScheduler>,
}
impl Default for ThreatIntelConfig {
fn default() -> Self {
let mut feeds = HashMap::new();
feeds.insert(
"tor_exits".to_string(),
FeedConfig {
enabled: false, feed_type: FeedType::TorExitNodes,
url: "https://check.torproject.org/torbulkexitlist".to_string(),
api_key: None,
headers: HashMap::new(),
filename: "tor-exits.txt".to_string(),
format: FeedFormat::PlainText,
custom_interval_seconds: Some(3600), },
);
feeds.insert(
"spamhaus_drop".to_string(),
FeedConfig {
enabled: false, feed_type: FeedType::MaliciousIPs,
url: "https://www.spamhaus.org/drop/drop.txt".to_string(),
api_key: None,
headers: HashMap::new(),
filename: "spamhaus-drop.txt".to_string(),
format: FeedFormat::PlainText,
custom_interval_seconds: Some(3600),
},
);
feeds.insert(
"emergingthreats_compromised".to_string(),
FeedConfig {
enabled: false, feed_type: FeedType::MaliciousIPs,
url: "https://rules.emergingthreats.net/fwrules/emerging-Block-IPs.txt".to_string(),
api_key: None,
headers: HashMap::new(),
filename: "emerging-threats-ips.txt".to_string(),
format: FeedFormat::PlainText,
custom_interval_seconds: Some(7200), },
);
feeds.insert(
"virustotal_malicious".to_string(),
FeedConfig {
enabled: false, feed_type: FeedType::MaliciousIPs,
url: "https://www.virustotal.com/api/v3/intelligence/hunting_notification_files"
.to_string(),
api_key: None, headers: HashMap::new(),
filename: "virustotal-malicious.json".to_string(),
format: FeedFormat::Json,
custom_interval_seconds: Some(1800),
},
);
feeds.insert(
"maxmind_proxy_detection".to_string(),
FeedConfig {
enabled: false, feed_type: FeedType::VpnProxy,
url: "https://download.maxmind.com/app/geoip_download".to_string(),
api_key: None, headers: HashMap::new(),
filename: "maxmind-proxy-ranges.csv".to_string(),
format: FeedFormat::Csv,
custom_interval_seconds: Some(86400), },
);
Self {
auto_update_enabled: false, update_interval_seconds: 3600, feeds_directory: PathBuf::from("threat-feeds"),
feeds,
download_timeout_seconds: 30,
}
}
}
impl ThreatIntelConfig {
pub fn from_env_and_config() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let config_path = std::env::var("THREAT_INTEL_CONFIG_PATH")
.unwrap_or_else(|_| "threat-intel-config.yaml".to_string());
if std::path::Path::new(&config_path).exists() {
let config_content = std::fs::read_to_string(&config_path)?;
let mut config: Self = serde_yaml::from_str(&config_content)?;
if let Ok(enabled) = std::env::var("THREAT_INTEL_ENABLED") {
config.auto_update_enabled = enabled.to_lowercase() == "true";
}
if let Ok(interval) = std::env::var("THREAT_INTEL_UPDATE_INTERVAL")
&& let Ok(seconds) = interval.parse::<u64>() {
config.update_interval_seconds = seconds;
}
if let Ok(feeds_dir) = std::env::var("THREAT_INTEL_FEEDS_DIR") {
config.feeds_directory = std::path::PathBuf::from(feeds_dir);
}
Ok(config)
} else {
Ok(Self::from_env_defaults())
}
}
fn from_env_defaults() -> Self {
let enabled = std::env::var("THREAT_INTEL_ENABLED")
.unwrap_or_else(|_| "false".to_string())
.to_lowercase()
== "true";
let update_interval = std::env::var("THREAT_INTEL_UPDATE_INTERVAL")
.unwrap_or_else(|_| "3600".to_string())
.parse::<u64>()
.unwrap_or(3600);
let feeds_dir = std::env::var("THREAT_INTEL_FEEDS_DIR")
.unwrap_or_else(|_| "./threat-feeds".to_string());
let timeout = std::env::var("THREAT_INTEL_TIMEOUT")
.unwrap_or_else(|_| "30".to_string())
.parse::<u64>()
.unwrap_or(30);
let mut feeds = HashMap::new();
if std::env::var("TOR_EXITS_ENABLED")
.unwrap_or_else(|_| "true".to_string())
.to_lowercase()
== "true"
{
feeds.insert(
"tor_exits".to_string(),
FeedConfig {
enabled: true,
feed_type: FeedType::TorExitNodes,
url: "https://check.torproject.org/torbulkexitlist".to_string(),
api_key: None,
headers: HashMap::new(),
filename: "tor-exits.txt".to_string(),
format: FeedFormat::PlainText,
custom_interval_seconds: None,
},
);
}
if std::env::var("SPAMHAUS_DROP_ENABLED")
.unwrap_or_else(|_| "true".to_string())
.to_lowercase()
== "true"
{
feeds.insert(
"spamhaus_drop".to_string(),
FeedConfig {
enabled: true,
feed_type: FeedType::MaliciousIPs,
url: "https://www.spamhaus.org/drop/drop.txt".to_string(),
api_key: None,
headers: HashMap::new(),
filename: "spamhaus-drop.txt".to_string(),
format: FeedFormat::PlainText,
custom_interval_seconds: None,
},
);
}
if std::env::var("EMERGINGTHREATS_ENABLED")
.unwrap_or_else(|_| "false".to_string())
.to_lowercase()
== "true"
{
feeds.insert(
"emergingthreats".to_string(),
FeedConfig {
enabled: true,
feed_type: FeedType::MaliciousIPs,
url: "https://rules.emergingthreats.net/fwrules/emerging-Block-IPs.txt"
.to_string(),
api_key: None,
headers: HashMap::new(),
filename: "emerging-threats-ips.txt".to_string(),
format: FeedFormat::PlainText,
custom_interval_seconds: None,
},
);
}
Self {
auto_update_enabled: enabled,
update_interval_seconds: update_interval,
feeds_directory: std::path::PathBuf::from(feeds_dir),
download_timeout_seconds: timeout,
feeds,
}
}
}
impl ThreatFeedManager {
pub async fn new_async(config: ThreatIntelConfig) -> Result<Self, Box<dyn std::error::Error>> {
if !config.feeds_directory.exists() {
fs::create_dir_all(&config.feeds_directory).await?;
}
let client = Client::builder()
.timeout(Duration::from_secs(config.download_timeout_seconds))
.user_agent("AuthFramework-ThreatIntel/1.0")
.build()?;
let scheduler = Some(tokio_cron_scheduler::JobScheduler::new().await?);
Ok(Self {
config,
client,
scheduler,
})
}
pub fn new(
config: ThreatIntelConfig,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
if !config.feeds_directory.exists() {
std::fs::create_dir_all(&config.feeds_directory)?;
}
let client = Client::builder()
.timeout(Duration::from_secs(config.download_timeout_seconds))
.user_agent("AuthFramework-ThreatIntel/1.0")
.build()?;
let scheduler = None;
Ok(Self {
config,
client,
scheduler,
})
}
pub fn start_automated_updates(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if !self.config.auto_update_enabled {
log::info!("Automated updates disabled in configuration");
return Ok(());
}
let update_interval = format!("0 */{} * * * *", self.config.update_interval_seconds / 60);
log::info!(
"🚀 Starting automated threat intelligence updates (interval: {})",
update_interval
);
log::info!("✅ Automated threat intelligence updates scheduled successfully");
Ok(())
}
pub fn load_config() -> ThreatIntelConfig {
if let Ok(config_content) = std::fs::read_to_string("threat-intel-config.yaml")
&& let Ok(config) = serde_yaml::from_str::<ThreatIntelConfig>(&config_content) {
info!("Loaded threat intelligence configuration from file");
return config;
}
let mut config = ThreatIntelConfig::default();
if let Ok(enabled) = std::env::var("THREAT_INTEL_AUTO_UPDATE") {
config.auto_update_enabled = enabled.to_lowercase() == "true";
}
let feed_switches = [
("THREAT_INTEL_TOR_EXITS", "tor_exits"),
("THREAT_INTEL_SPAMHAUS", "spamhaus_drop"),
(
"THREAT_INTEL_EMERGING_THREATS",
"emergingthreats_compromised",
),
("THREAT_INTEL_VIRUSTOTAL", "virustotal_malicious"),
("THREAT_INTEL_MAXMIND_PROXY", "maxmind_proxy_detection"),
];
for (env_var, feed_name) in &feed_switches {
if let Ok(enabled) = std::env::var(env_var)
&& let Some(feed) = config.feeds.get_mut(*feed_name) {
feed.enabled = enabled.to_lowercase() == "true";
info!(
"Feed {} enabled via {}: {}",
feed_name, env_var, feed.enabled
);
}
}
if let Ok(api_key) = std::env::var("VIRUSTOTAL_API_KEY")
&& let Some(feed) = config.feeds.get_mut("virustotal_malicious") {
feed.api_key = Some(api_key);
feed.headers
.insert("X-Apikey".to_string(), feed.api_key.clone().unwrap());
}
if let Ok(license_key) = std::env::var("MAXMIND_LICENSE_KEY")
&& let Some(feed) = config.feeds.get_mut("maxmind_proxy_detection") {
feed.api_key = Some(license_key.clone());
feed.url = format!(
"{}?edition_id=GeoIP2-Anonymous-IP&license_key={}&suffix=tar.gz",
feed.url, license_key
);
}
config
}
pub async fn start_automation(&mut self) -> Result<(), Box<dyn std::error::Error>> {
if !self.config.auto_update_enabled {
info!("Threat intelligence automation is disabled");
return Ok(());
}
info!("Starting automated threat intelligence feed updates");
self.download_all_feeds().await?;
self.start_automated_scheduling().await?;
info!("✅ Threat intelligence feeds downloaded and scheduling activated");
Ok(())
}
async fn start_automated_scheduling(&mut self) -> Result<(), Box<dyn std::error::Error>> {
if let Some(scheduler) = &self.scheduler {
info!("Setting up automated threat intelligence feed scheduling...");
for (feed_name, feed_config) in &self.config.feeds {
if feed_config.enabled {
let update_interval_seconds =
feed_config.custom_interval_seconds.unwrap_or(86400); let update_interval_hours = update_interval_seconds / 3600;
let cron_expression = format!("0 0 */{} * * *", update_interval_hours.max(1));
info!(
"Scheduling '{}' feed updates every {} hours (cron: {})",
feed_name, update_interval_hours, cron_expression
);
let client_clone = self.client.clone();
let config_clone = self.config.clone();
let feed_name_clone = feed_name.clone();
let feed_config_clone = feed_config.clone();
let job = Job::new_async(cron_expression.as_str(), move |_uuid, _l| {
let client = client_clone.clone();
let config = config_clone.clone();
let name = feed_name_clone.clone();
let config_feed = feed_config_clone.clone();
Box::pin(async move {
info!("⏰ Scheduled update starting for threat feed: {}", name);
match Self::download_feed(&client, &config, &name, &config_feed).await {
Ok(()) => {
info!("✅ Scheduled update completed for '{}'", name);
}
Err(e) => {
error!("❌ Scheduled update failed for '{}': {}", name, e);
}
}
})
})?;
scheduler.add(job).await?;
}
}
scheduler.start().await?;
info!("🚀 Threat intelligence scheduling started successfully");
} else {
warn!("⚠️ Scheduler not initialized - automated updates disabled");
}
Ok(())
}
pub async fn download_all_feeds(&self) -> Result<(), Box<dyn std::error::Error>> {
for (feed_name, feed_config) in &self.config.feeds {
if feed_config.enabled {
match Self::download_feed(&self.client, &self.config, feed_name, feed_config).await
{
Ok(_) => info!("Successfully downloaded feed: {}", feed_name),
Err(e) => error!("Failed to download feed {}: {}", feed_name, e),
}
}
}
Ok(())
}
async fn download_feed(
client: &Client,
config: &ThreatIntelConfig,
feed_name: &str,
feed_config: &FeedConfig,
) -> Result<(), Box<dyn std::error::Error>> {
debug!("Downloading feed: {} from {}", feed_name, feed_config.url);
let mut request = client.get(&feed_config.url);
for (key, value) in &feed_config.headers {
request = request.header(key, value);
}
if let Some(api_key) = &feed_config.api_key {
match feed_name {
name if name.contains("virustotal") => {
request = request.header("X-Apikey", api_key);
}
name if name.contains("maxmind") => {
}
_ => {
request = request.header("Authorization", format!("Bearer {}", api_key));
}
}
}
let response = request.send().await?;
if !response.status().is_success() {
return Err(format!(
"HTTP error {}: {}",
response.status(),
response.text().await?
)
.into());
}
let content = response.bytes().await?;
let file_path = config.feeds_directory.join(&feed_config.filename);
if feed_config.filename.ends_with(".tar.gz") {
Self::extract_compressed_feed(&content, &file_path).await?;
} else {
fs::write(&file_path, &content).await?;
}
info!("Saved feed {} to {}", feed_name, file_path.display());
Self::validate_feed_format(&file_path, &feed_config.format)?;
Ok(())
}
async fn extract_compressed_feed(
content: &[u8],
output_path: &Path,
) -> Result<(), Box<dyn std::error::Error>> {
let extension = output_path
.extension()
.and_then(|ext| ext.to_str())
.unwrap_or("");
match extension.to_lowercase().as_str() {
"gz" | "tar" => Self::extract_tar_gz(content, output_path).await,
"zip" => Self::extract_zip(content, output_path).await,
"bz2" => Self::extract_bzip2(content, output_path).await,
"xz" => Self::extract_xz(content, output_path).await,
_ => {
fs::write(output_path, content).await?;
warn!(
"Unknown compression format '{}' - saved as-is: {}",
extension,
output_path.display()
);
Ok(())
}
}
}
async fn extract_tar_gz(
content: &[u8],
output_path: &Path,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Extracting tar.gz archive to: {}", output_path.display());
warn!("🔧 Production tar.gz extraction requires `tar` and `flate2` crates");
warn!("Add dependencies: tar = \"0.4\", flate2 = \"1.0\" to Cargo.toml");
fs::write(output_path, content).await?;
info!("Compressed content saved - implement tar.gz extraction for production");
Ok(())
}
async fn extract_zip(
content: &[u8],
output_path: &Path,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Extracting ZIP archive to: {}", output_path.display());
warn!("🔧 Production ZIP extraction requires `zip` crate");
warn!("Add dependency: zip = \"0.6\" to Cargo.toml");
fs::write(output_path, content).await?;
info!("Compressed content saved - implement ZIP extraction for production");
Ok(())
}
async fn extract_bzip2(
content: &[u8],
output_path: &Path,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Extracting bzip2 archive to: {}", output_path.display());
warn!("🔧 Production bzip2 extraction requires `bzip2` crate");
warn!("Add dependency: bzip2 = \"0.4\" to Cargo.toml");
fs::write(output_path, content).await?;
Ok(())
}
async fn extract_xz(
content: &[u8],
output_path: &Path,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Extracting XZ archive to: {}", output_path.display());
warn!("🔧 Production XZ extraction requires `xz2` crate");
warn!("Add dependency: xz2 = \"0.1\" to Cargo.toml");
fs::write(output_path, content).await?;
Ok(())
}
fn validate_feed_format(
file_path: &Path,
format: &FeedFormat,
) -> Result<(), Box<dyn std::error::Error>> {
let content = std::fs::read_to_string(file_path)?;
match format {
FeedFormat::PlainText => {
let lines: Vec<&str> = content
.lines()
.filter(|l| !l.trim().is_empty() && !l.starts_with('#'))
.collect();
if lines.is_empty() {
return Err("Feed appears to be empty".into());
}
}
FeedFormat::Csv => {
let mut reader = csv::Reader::from_reader(content.as_bytes());
if reader.headers().is_err() {
return Err("Invalid CSV format".into());
}
}
FeedFormat::Json => {
serde_json::from_str::<serde_json::Value>(&content)?;
}
FeedFormat::Xml => {
if !content.trim_start().starts_with('<') {
return Err("Invalid XML format".into());
}
}
}
debug!("Feed format validation passed: {}", file_path.display());
Ok(())
}
pub async fn get_feed_status(&self) -> HashMap<String, FeedStatus> {
let mut status = HashMap::new();
for (feed_name, feed_config) in &self.config.feeds {
let file_path = self.config.feeds_directory.join(&feed_config.filename);
let feed_status = if feed_config.enabled {
if file_path.exists() {
if let Ok(metadata) = fs::metadata(&file_path).await {
FeedStatus::Active {
last_updated: metadata
.modified()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH),
size_bytes: metadata.len(),
}
} else {
FeedStatus::Error("Cannot read file metadata".to_string())
}
} else {
FeedStatus::NotDownloaded
}
} else {
FeedStatus::Disabled
};
status.insert(feed_name.clone(), feed_status);
}
status
}
pub async fn update_feed(&self, feed_name: &str) -> Result<(), Box<dyn std::error::Error>> {
if let Some(feed_config) = self.config.feeds.get(feed_name) {
if feed_config.enabled {
Self::download_feed(&self.client, &self.config, feed_name, feed_config).await
} else {
Err(format!("Feed '{}' is disabled", feed_name).into())
}
} else {
Err(format!("Feed '{}' not found", feed_name).into())
}
}
pub fn is_malicious_ip(&self, ip: &std::net::IpAddr) -> bool {
for (feed_name, feed_config) in &self.config.feeds {
if !feed_config.enabled {
continue;
}
if matches!(feed_config.feed_type, FeedType::MaliciousIPs) {
let file_path = self.config.feeds_directory.join(&feed_config.filename);
if self.check_ip_in_feed(&file_path, ip) {
log::warn!("Malicious IP detected: {} (source: {})", ip, feed_name);
return true;
}
}
}
false
}
pub fn is_tor_exit(&self, ip: &std::net::IpAddr) -> bool {
for (feed_name, feed_config) in &self.config.feeds {
if !feed_config.enabled {
continue;
}
if matches!(feed_config.feed_type, FeedType::TorExitNodes) {
let file_path = self.config.feeds_directory.join(&feed_config.filename);
if self.check_ip_in_feed(&file_path, ip) {
log::warn!("Tor exit node detected: {} (source: {})", ip, feed_name);
return true;
}
}
}
false
}
pub fn is_proxy_vpn(&self, ip: &std::net::IpAddr) -> bool {
for (feed_name, feed_config) in &self.config.feeds {
if !feed_config.enabled {
continue;
}
if matches!(feed_config.feed_type, FeedType::VpnProxy) {
let file_path = self.config.feeds_directory.join(&feed_config.filename);
if self.check_ip_in_feed(&file_path, ip) {
log::info!("VPN/Proxy detected: {} (source: {})", ip, feed_name);
return true;
}
}
}
false
}
fn check_ip_in_feed(&self, file_path: &std::path::Path, ip: &std::net::IpAddr) -> bool {
if !file_path.exists() {
return false;
}
if let Ok(contents) = std::fs::read_to_string(file_path) {
for line in contents.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
if line == ip.to_string() {
return true;
}
if line.contains('/') {
match ip {
std::net::IpAddr::V4(ipv4) => {
if let Ok(network) = line.parse::<ipnetwork::Ipv4Network>()
&& network.contains(*ipv4) {
return true;
}
}
std::net::IpAddr::V6(ipv6) => {
if let Ok(network) = line.parse::<ipnetwork::Ipv6Network>()
&& network.contains(*ipv6) {
return true;
}
}
}
}
}
}
false
}
}
#[derive(Debug, Clone)]
pub enum FeedStatus {
Disabled,
NotDownloaded,
Active {
last_updated: std::time::SystemTime,
size_bytes: u64,
},
Error(String),
}