pub mod commons;
pub mod rtr;
pub use commons::{RpkiAspaEntry, RpkiAspaProvider, RpkiAspaTableEntry, RpkiRoaEntry};
pub use rtr::RtrClient;
use crate::database::MonocleDatabase;
use crate::utils::option_u32_from_str;
use anyhow::Result;
use bgpkit_commons::rpki::RpkiTrie;
use chrono::NaiveDate;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[cfg_attr(feature = "cli", derive(clap::ValueEnum))]
pub enum RpkiOutputFormat {
#[default]
Table,
Json,
Pretty,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[cfg_attr(feature = "cli", derive(clap::ValueEnum))]
pub enum RpkiDataSource {
#[default]
Cloudflare,
Ripe,
RpkiViews,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[cfg_attr(feature = "cli", derive(clap::ValueEnum))]
pub enum RpkiViewsCollectorOption {
#[default]
Soborost,
Massars,
Attn,
Kerfuffle,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum RpkiValidationState {
Valid,
Invalid,
NotFound,
}
impl std::fmt::Display for RpkiValidationState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RpkiValidationState::Valid => write!(f, "valid"),
RpkiValidationState::Invalid => write!(f, "invalid"),
RpkiValidationState::NotFound => write!(f, "not_found"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpkiValidationResult {
pub prefix: String,
pub asn: u32,
pub state: RpkiValidationState,
pub reason: String,
pub covering_roas: Vec<RpkiRoaRecord>,
}
#[derive(Debug, Clone, Serialize, Deserialize, tabled::Tabled)]
pub struct RpkiRoaRecord {
pub prefix: String,
pub max_length: u8,
pub origin_asn: u32,
pub ta: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpkiAspaRecord {
pub customer_asn: u32,
pub provider_asns: Vec<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpkiRefreshResult {
pub roa_count: usize,
pub aspa_count: usize,
pub roa_source: String,
pub warning: Option<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[cfg_attr(feature = "cli", derive(clap::Args))]
pub struct RpkiRoaLookupArgs {
#[cfg_attr(feature = "cli", clap(short, long))]
pub prefix: Option<String>,
#[cfg_attr(feature = "cli", clap(short, long))]
#[serde(default, deserialize_with = "option_u32_from_str")]
pub asn: Option<u32>,
#[cfg_attr(feature = "cli", clap(short, long))]
#[serde(default)]
pub date: Option<NaiveDate>,
#[cfg_attr(feature = "cli", clap(long, default_value = "cloudflare"))]
#[serde(default)]
pub source: RpkiDataSource,
#[cfg_attr(feature = "cli", clap(long))]
#[serde(default)]
pub collector: Option<RpkiViewsCollectorOption>,
#[cfg_attr(feature = "cli", clap(short, long, default_value = "table"))]
#[serde(default)]
pub format: RpkiOutputFormat,
}
impl RpkiRoaLookupArgs {
pub fn new() -> Self {
Self::default()
}
pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
self.prefix = Some(prefix.into());
self
}
pub fn with_asn(mut self, asn: u32) -> Self {
self.asn = Some(asn);
self
}
pub fn with_date(mut self, date: NaiveDate) -> Self {
self.date = Some(date);
self
}
pub fn with_source(mut self, source: RpkiDataSource) -> Self {
self.source = source;
self
}
pub fn with_format(mut self, format: RpkiOutputFormat) -> Self {
self.format = format;
self
}
pub fn is_historical(&self) -> bool {
self.date.is_some()
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[cfg_attr(feature = "cli", derive(clap::Args))]
pub struct RpkiAspaLookupArgs {
#[cfg_attr(feature = "cli", clap(short, long))]
#[serde(default, deserialize_with = "option_u32_from_str")]
pub customer_asn: Option<u32>,
#[cfg_attr(feature = "cli", clap(short, long))]
#[serde(default, deserialize_with = "option_u32_from_str")]
pub provider_asn: Option<u32>,
#[cfg_attr(feature = "cli", clap(short, long))]
#[serde(default)]
pub date: Option<NaiveDate>,
#[cfg_attr(feature = "cli", clap(long, default_value = "cloudflare"))]
#[serde(default)]
pub source: RpkiDataSource,
#[cfg_attr(feature = "cli", clap(long))]
#[serde(default)]
pub collector: Option<RpkiViewsCollectorOption>,
#[cfg_attr(feature = "cli", clap(short, long, default_value = "table"))]
#[serde(default)]
pub format: RpkiOutputFormat,
}
impl RpkiAspaLookupArgs {
pub fn new() -> Self {
Self::default()
}
pub fn with_customer(mut self, asn: u32) -> Self {
self.customer_asn = Some(asn);
self
}
pub fn with_provider(mut self, asn: u32) -> Self {
self.provider_asn = Some(asn);
self
}
pub fn with_format(mut self, format: RpkiOutputFormat) -> Self {
self.format = format;
self
}
pub fn is_historical(&self) -> bool {
self.date.is_some()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "cli", derive(clap::Args))]
pub struct RpkiValidateArgs {
pub prefix: String,
pub asn: u32,
#[cfg_attr(feature = "cli", clap(short, long, default_value = "table"))]
#[serde(default)]
pub format: RpkiOutputFormat,
}
impl RpkiValidateArgs {
pub fn new(prefix: impl Into<String>, asn: u32) -> Self {
Self {
prefix: prefix.into(),
asn,
format: RpkiOutputFormat::default(),
}
}
pub fn with_format(mut self, format: RpkiOutputFormat) -> Self {
self.format = format;
self
}
}
pub struct RpkiLens<'a> {
db: &'a MonocleDatabase,
historical_trie: Option<RpkiTrie>,
}
impl<'a> RpkiLens<'a> {
pub fn new(db: &'a MonocleDatabase) -> Self {
Self {
db,
historical_trie: None,
}
}
pub fn is_empty(&self) -> Result<bool> {
Ok(self.db.rpki().is_empty())
}
pub fn needs_refresh(&self, ttl: std::time::Duration) -> Result<bool> {
Ok(self.db.rpki().needs_refresh(ttl))
}
pub fn refresh_reason(
&self,
ttl: std::time::Duration,
) -> Result<Option<crate::utils::RefreshReason>> {
use crate::utils::RefreshReason;
let rpki = self.db.rpki();
if rpki.is_empty() {
return Ok(Some(RefreshReason::Empty));
}
if rpki.needs_refresh(ttl) {
return Ok(Some(RefreshReason::Outdated));
}
Ok(None)
}
pub fn get_metadata(&self) -> Result<Option<crate::database::RpkiCacheMetadata>> {
self.db.rpki().get_metadata()
}
pub fn refresh(&self) -> Result<(usize, usize)> {
let trie = commons::load_current_rpki()?;
let roas = extract_roas_from_trie(&trie);
let aspas = extract_aspas_from_trie(&trie);
let roa_count = roas.len();
let aspa_count = aspas.len();
self.db
.rpki()
.store(&roas, &aspas, "Cloudflare", "Cloudflare")?;
Ok((roa_count, aspa_count))
}
pub fn refresh_with_rtr(
&self,
rtr_endpoint: Option<&str>,
rtr_timeout: std::time::Duration,
no_fallback: bool,
) -> Result<RpkiRefreshResult> {
let rtr_config = if let Some(endpoint) = rtr_endpoint {
parse_endpoint(endpoint)?
} else {
None
};
tracing::info!("Loading ASPAs from Cloudflare...");
let trie = commons::load_current_rpki()?;
let aspas = extract_aspas_from_trie(&trie);
let aspa_count = aspas.len();
let (roas, roa_source, warning) = if let Some((host, port)) = rtr_config {
tracing::info!("Connecting to RTR server {}:{}...", host, port);
let client = rtr::RtrClient::new(host.clone(), port, rtr_timeout);
match client.fetch_roas() {
Ok(roas) => {
tracing::info!(
"Loaded {} ROAs from RTR server {}:{}",
roas.len(),
host,
port
);
let source = format!("RTR ({}:{})", host, port);
(roas, source, None)
}
Err(e) => {
if no_fallback {
return Err(anyhow::anyhow!(
"RTR fetch from {}:{} failed: {}",
host,
port,
e
));
}
let warning_msg = format!(
"RTR fetch from {}:{} failed: {}. Falling back to Cloudflare.",
host, port, e
);
tracing::warn!("{}", warning_msg);
let roas = extract_roas_from_trie(&trie);
(roas, "Cloudflare (fallback)".to_string(), Some(warning_msg))
}
}
} else {
tracing::info!("Loading ROAs from Cloudflare...");
let roas = extract_roas_from_trie(&trie);
(roas, "Cloudflare".to_string(), None)
};
let roa_count = roas.len();
self.db
.rpki()
.store(&roas, &aspas, &roa_source, "Cloudflare")?;
tracing::info!(
"Stored {} ROAs (from {}), {} ASPAs (from Cloudflare)",
roa_count,
roa_source,
aspa_count
);
Ok(RpkiRefreshResult {
roa_count,
aspa_count,
roa_source,
warning,
})
}
pub fn validate(&self, prefix: &str, asn: u32) -> Result<RpkiValidationResult> {
let covering_roas = self.get_covering_roas(prefix)?;
if covering_roas.is_empty() {
return Ok(RpkiValidationResult {
prefix: prefix.to_string(),
asn,
state: RpkiValidationState::NotFound,
reason: "No covering ROA found".to_string(),
covering_roas: Vec::new(),
});
}
let query_prefix_len = parse_prefix_length(prefix)?;
for roa in &covering_roas {
if roa.origin_asn == asn && query_prefix_len <= roa.max_length {
return Ok(RpkiValidationResult {
prefix: prefix.to_string(),
asn,
state: RpkiValidationState::Valid,
reason: "ROA exists with matching ASN and valid prefix length".to_string(),
covering_roas,
});
}
}
let has_matching_asn = covering_roas.iter().any(|r| r.origin_asn == asn);
let reason = if has_matching_asn {
format!(
"Prefix length {} exceeds max_length in covering ROAs",
query_prefix_len
)
} else {
let authorized_asns: Vec<String> = covering_roas
.iter()
.map(|r| r.origin_asn.to_string())
.collect();
format!(
"ASN {} not authorized; authorized ASNs: {}",
asn,
authorized_asns.join(", ")
)
};
Ok(RpkiValidationResult {
prefix: prefix.to_string(),
asn,
state: RpkiValidationState::Invalid,
reason,
covering_roas,
})
}
pub fn get_covering_roas(&self, prefix: &str) -> Result<Vec<RpkiRoaRecord>> {
let db_roas = self.db.rpki().get_covering_roas(prefix)?;
Ok(db_roas
.into_iter()
.map(|r| RpkiRoaRecord {
prefix: r.prefix,
max_length: r.max_length,
origin_asn: r.origin_asn,
ta: r.ta,
})
.collect())
}
pub fn get_roas(&mut self, args: &RpkiRoaLookupArgs) -> Result<Vec<RpkiRoaEntry>> {
if args.is_historical() {
let trie =
self.load_historical_data(args.date, &args.source, args.collector.as_ref())?;
commons::get_roas(trie, args.prefix.as_deref(), args.asn)
} else {
self.get_roas_from_cache(args.prefix.as_deref(), args.asn)
}
}
fn get_roas_from_cache(
&self,
prefix: Option<&str>,
asn: Option<u32>,
) -> Result<Vec<RpkiRoaEntry>> {
let repo = self.db.rpki();
let roas = match (prefix, asn) {
(Some(p), Some(a)) => {
let covering = repo.get_covering_roas(p)?;
covering.into_iter().filter(|r| r.origin_asn == a).collect()
}
(Some(p), None) => {
repo.get_covering_roas(p)?
}
(None, Some(a)) => {
repo.get_roas_by_asn(a)?
}
(None, None) => {
repo.get_all_roas()?
}
};
Ok(roas
.into_iter()
.map(|r| RpkiRoaEntry {
prefix: r.prefix,
max_length: r.max_length,
origin_asn: r.origin_asn,
ta: r.ta,
})
.collect())
}
pub fn get_roas_by_asn(&self, asn: u32) -> Result<Vec<RpkiRoaRecord>> {
let db_roas = self.db.rpki().get_roas_by_asn(asn)?;
Ok(db_roas
.into_iter()
.map(|r| RpkiRoaRecord {
prefix: r.prefix,
max_length: r.max_length,
origin_asn: r.origin_asn,
ta: r.ta,
})
.collect())
}
pub fn get_aspas(&mut self, args: &RpkiAspaLookupArgs) -> Result<Vec<RpkiAspaEntry>> {
if args.is_historical() {
let trie =
self.load_historical_data(args.date, &args.source, args.collector.as_ref())?;
let mut aspas = commons::get_aspas(trie, args.customer_asn, args.provider_asn)?;
self.enrich_aspa_names(&mut aspas);
Ok(aspas)
} else {
self.get_aspas_from_cache(args.customer_asn, args.provider_asn)
}
}
fn get_aspas_from_cache(
&self,
customer_asn: Option<u32>,
provider_asn: Option<u32>,
) -> Result<Vec<RpkiAspaEntry>> {
let repo = self.db.rpki();
let enriched_aspas = match (customer_asn, provider_asn) {
(Some(c), Some(p)) => {
let by_customer = repo.get_aspas_by_customer_enriched(c)?;
by_customer
.into_iter()
.map(|mut a| {
a.providers.retain(|prov| prov.asn == p);
a
})
.filter(|a| !a.providers.is_empty())
.collect()
}
(Some(c), None) => {
repo.get_aspas_by_customer_enriched(c)?
}
(None, Some(p)) => {
repo.get_aspas_by_provider_enriched(p)?
}
(None, None) => {
repo.get_all_aspas_enriched()?
}
};
Ok(enriched_aspas
.into_iter()
.map(|a| RpkiAspaEntry {
customer_asn: a.customer_asn,
customer_name: a.customer_name,
customer_country: a.customer_country,
providers: a
.providers
.into_iter()
.map(|p| RpkiAspaProvider {
asn: p.asn,
name: p.name,
})
.collect(),
})
.collect())
}
fn enrich_aspa_names(&self, aspas: &mut [RpkiAspaEntry]) {
let mut asns = Vec::new();
for aspa in aspas.iter() {
asns.push(aspa.customer_asn);
asns.extend(aspa.providers.iter().map(|p| p.asn));
}
if asns.is_empty() {
return;
}
let names = self.db.asinfo().lookup_preferred_names_batch(&asns);
for aspa in aspas.iter_mut() {
if aspa.customer_name.is_none() {
aspa.customer_name = names.get(&aspa.customer_asn).cloned();
}
for provider in aspa.providers.iter_mut() {
if provider.name.is_none() {
provider.name = names.get(&provider.asn).cloned();
}
}
}
}
pub fn get_aspa_by_customer(&self, customer_asn: u32) -> Result<Option<RpkiAspaRecord>> {
let aspas = self.db.rpki().get_aspas_by_customer(customer_asn)?;
Ok(aspas.into_iter().next().map(|a| RpkiAspaRecord {
customer_asn: a.customer_asn,
provider_asns: a.provider_asns,
}))
}
fn load_historical_data(
&mut self,
date: Option<NaiveDate>,
source: &RpkiDataSource,
collector: Option<&RpkiViewsCollectorOption>,
) -> Result<&RpkiTrie> {
let source_str = match source {
RpkiDataSource::Cloudflare => None,
RpkiDataSource::Ripe => Some("ripe"),
RpkiDataSource::RpkiViews => Some("rpkiviews"),
};
let collector_str = collector.map(|c| match c {
RpkiViewsCollectorOption::Soborost => "soborost",
RpkiViewsCollectorOption::Massars => "massars",
RpkiViewsCollectorOption::Attn => "attn",
RpkiViewsCollectorOption::Kerfuffle => "kerfuffle",
});
let trie = commons::load_rpki_data(date, source_str, collector_str)?;
self.historical_trie = Some(trie);
#[allow(clippy::expect_used)]
Ok(self.historical_trie.as_ref().expect("trie was just set"))
}
pub fn format_roas(&self, roas: &[RpkiRoaEntry], format: &RpkiOutputFormat) -> String {
match format {
RpkiOutputFormat::Table => {
use tabled::settings::Style;
use tabled::Table;
Table::new(roas).with(Style::rounded()).to_string()
}
RpkiOutputFormat::Json => serde_json::to_string(roas).unwrap_or_default(),
RpkiOutputFormat::Pretty => serde_json::to_string_pretty(roas).unwrap_or_default(),
}
}
pub fn format_aspas(&self, aspas: &[RpkiAspaEntry], format: &RpkiOutputFormat) -> String {
match format {
RpkiOutputFormat::Table => {
use tabled::settings::Style;
use tabled::Table;
let table_entries: Vec<RpkiAspaTableEntry> =
aspas.iter().map(|a| a.into()).collect();
Table::new(table_entries).with(Style::rounded()).to_string()
}
RpkiOutputFormat::Json => serde_json::to_string(aspas).unwrap_or_default(),
RpkiOutputFormat::Pretty => serde_json::to_string_pretty(aspas).unwrap_or_default(),
}
}
pub fn format_validation(
&self,
result: &RpkiValidationResult,
format: &RpkiOutputFormat,
) -> String {
match format {
RpkiOutputFormat::Table => {
use tabled::settings::Style;
use tabled::Table;
#[derive(tabled::Tabled)]
struct ValidationRow {
prefix: String,
asn: u32,
state: String,
reason: String,
}
let row = ValidationRow {
prefix: result.prefix.clone(),
asn: result.asn,
state: result.state.to_string(),
reason: result.reason.clone(),
};
let mut output = Table::new(vec![row]).with(Style::rounded()).to_string();
if !result.covering_roas.is_empty() {
output.push_str("\n\nCovering ROAs:\n");
output.push_str(
&Table::new(&result.covering_roas)
.with(Style::rounded())
.to_string(),
);
}
output
}
RpkiOutputFormat::Json => serde_json::to_string(result).unwrap_or_default(),
RpkiOutputFormat::Pretty => serde_json::to_string_pretty(result).unwrap_or_default(),
}
}
}
fn parse_prefix_length(prefix: &str) -> Result<u8> {
let parts: Vec<&str> = prefix.split('/').collect();
if parts.len() != 2 {
anyhow::bail!("Invalid prefix format: {}", prefix);
}
parts[1]
.parse::<u8>()
.map_err(|e| anyhow::anyhow!("Invalid prefix length: {}", e))
}
fn parse_endpoint(endpoint: &str) -> Result<Option<(String, u16)>> {
if let Some(bracket_end) = endpoint.find("]:") {
if endpoint.starts_with('[') {
let host = &endpoint[1..bracket_end];
let port_str = &endpoint[bracket_end + 2..];
let port = port_str
.parse::<u16>()
.map_err(|_| anyhow::anyhow!("Invalid RTR port: {}", port_str))?;
return Ok(Some((host.to_string(), port)));
}
}
let parts: Vec<&str> = endpoint.rsplitn(2, ':').collect();
match parts.as_slice() {
[port_str, host] => {
let port = port_str
.parse::<u16>()
.map_err(|_| anyhow::anyhow!("Invalid RTR port: {}", port_str))?;
Ok(Some((host.to_string(), port)))
}
_ => Err(anyhow::anyhow!(
"Invalid RTR endpoint format: '{}'. Expected host:port or [ipv6]:port",
endpoint
)),
}
}
pub fn extract_roas_from_trie(trie: &RpkiTrie) -> Vec<crate::database::RpkiRoaRecord> {
trie.trie
.iter()
.flat_map(|(prefix, roas)| {
roas.iter().map(move |roa| crate::database::RpkiRoaRecord {
prefix: prefix.to_string(),
max_length: roa.max_length,
origin_asn: roa.asn,
ta: roa.rir.map(|r| format!("{:?}", r)).unwrap_or_default(),
})
})
.collect()
}
pub fn extract_aspas_from_trie(trie: &RpkiTrie) -> Vec<crate::database::RpkiAspaRecord> {
trie.aspas
.iter()
.map(|a| crate::database::RpkiAspaRecord {
customer_asn: a.customer_asn,
provider_asns: a.providers.clone(),
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_roa_lookup_args_builder() {
let args = RpkiRoaLookupArgs::new()
.with_prefix("1.1.1.0/24")
.with_asn(13335)
.with_format(RpkiOutputFormat::Json);
assert_eq!(args.prefix, Some("1.1.1.0/24".to_string()));
assert_eq!(args.asn, Some(13335));
assert!(matches!(args.format, RpkiOutputFormat::Json));
assert!(!args.is_historical());
}
#[test]
fn test_roa_lookup_args_historical() {
let date = NaiveDate::from_ymd_opt(2024, 1, 1).unwrap();
let args = RpkiRoaLookupArgs::new()
.with_date(date)
.with_source(RpkiDataSource::Ripe);
assert!(args.is_historical());
}
#[test]
fn test_aspa_lookup_args_builder() {
let args = RpkiAspaLookupArgs::new()
.with_customer(13335)
.with_provider(174);
assert_eq!(args.customer_asn, Some(13335));
assert_eq!(args.provider_asn, Some(174));
assert!(!args.is_historical());
}
#[test]
fn test_validate_args() {
let args = RpkiValidateArgs::new("1.1.1.0/24", 13335).with_format(RpkiOutputFormat::Json);
assert_eq!(args.prefix, "1.1.1.0/24");
assert_eq!(args.asn, 13335);
assert!(matches!(args.format, RpkiOutputFormat::Json));
}
#[test]
fn test_validation_state_display() {
assert_eq!(RpkiValidationState::Valid.to_string(), "valid");
assert_eq!(RpkiValidationState::Invalid.to_string(), "invalid");
assert_eq!(RpkiValidationState::NotFound.to_string(), "not_found");
}
#[test]
fn test_parse_prefix_length() {
assert_eq!(parse_prefix_length("1.1.1.0/24").unwrap(), 24);
assert_eq!(parse_prefix_length("10.0.0.0/8").unwrap(), 8);
assert_eq!(parse_prefix_length("2001:db8::/32").unwrap(), 32);
assert!(parse_prefix_length("invalid").is_err());
}
#[test]
fn test_parse_endpoint() {
let result = parse_endpoint("rtr.example.com:8282").unwrap();
assert_eq!(result, Some(("rtr.example.com".to_string(), 8282)));
let result = parse_endpoint("192.0.2.1:8282").unwrap();
assert_eq!(result, Some(("192.0.2.1".to_string(), 8282)));
let result = parse_endpoint("[::1]:8282").unwrap();
assert_eq!(result, Some(("::1".to_string(), 8282)));
let result = parse_endpoint("[2001:db8::1]:323").unwrap();
assert_eq!(result, Some(("2001:db8::1".to_string(), 323)));
assert!(parse_endpoint("no-port").is_err());
assert!(parse_endpoint("host:notanumber").is_err());
}
}