use anyhow::{anyhow, Result};
use rusqlite::Connection;
pub const SCHEMA_VERSION: u32 = 3;
pub struct SchemaDefinitions;
impl SchemaDefinitions {
pub const META_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS monocle_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now'))
);
"#;
pub const AS2ORG_AS_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS as2org_as (
asn INTEGER PRIMARY KEY,
name TEXT NOT NULL,
org_id TEXT NOT NULL,
source TEXT NOT NULL
);
"#;
pub const AS2ORG_ORG_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS as2org_org (
org_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
country TEXT NOT NULL,
source TEXT NOT NULL
);
"#;
pub const AS2ORG_INDEXES: &'static [&'static str] = &[
"CREATE INDEX IF NOT EXISTS idx_as2org_as_org_id ON as2org_as(org_id)",
"CREATE INDEX IF NOT EXISTS idx_as2org_as_name ON as2org_as(name)",
"CREATE INDEX IF NOT EXISTS idx_as2org_org_name ON as2org_org(name)",
"CREATE INDEX IF NOT EXISTS idx_as2org_org_country ON as2org_org(country)",
];
pub const AS2ORG_VIEWS: &'static [&'static str] = &[
r#"
CREATE VIEW IF NOT EXISTS as2org_both AS
SELECT a.asn, a.name AS 'as_name', b.name AS 'org_name', b.org_id, b.country
FROM as2org_as AS a JOIN as2org_org AS b ON a.org_id = b.org_id;
"#,
r#"
CREATE VIEW IF NOT EXISTS as2org_count AS
SELECT org_id, org_name, COUNT(*) AS count
FROM as2org_both GROUP BY org_name
ORDER BY count DESC;
"#,
r#"
CREATE VIEW IF NOT EXISTS as2org_all AS
SELECT a.*, b.count
FROM as2org_both AS a JOIN as2org_count AS b ON a.org_id = b.org_id;
"#,
];
pub const AS2REL_META_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS as2rel_meta (
id INTEGER PRIMARY KEY CHECK (id = 1),
file_url TEXT NOT NULL,
last_updated INTEGER NOT NULL,
max_peers_count INTEGER NOT NULL DEFAULT 0
);
"#;
pub const AS2REL_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS as2rel (
asn1 INTEGER NOT NULL,
asn2 INTEGER NOT NULL,
paths_count INTEGER NOT NULL,
peers_count INTEGER NOT NULL,
rel INTEGER NOT NULL,
PRIMARY KEY (asn1, asn2, rel)
);
"#;
pub const AS2REL_INDEXES: &'static [&'static str] = &[
"CREATE INDEX IF NOT EXISTS idx_as2rel_asn1 ON as2rel(asn1)",
"CREATE INDEX IF NOT EXISTS idx_as2rel_asn2 ON as2rel(asn2)",
];
pub const RPKI_ROA_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS rpki_roa (
id INTEGER PRIMARY KEY AUTOINCREMENT,
prefix_start BLOB NOT NULL,
prefix_end BLOB NOT NULL,
prefix_length INTEGER NOT NULL,
max_length INTEGER NOT NULL,
origin_asn INTEGER NOT NULL,
ta TEXT NOT NULL,
prefix_str TEXT NOT NULL
);
"#;
pub const RPKI_ASPA_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS rpki_aspa (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_asn INTEGER NOT NULL,
provider_asn INTEGER NOT NULL
);
"#;
pub const RPKI_META_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS rpki_meta (
id INTEGER PRIMARY KEY CHECK (id = 1),
updated_at INTEGER NOT NULL,
roa_count INTEGER NOT NULL DEFAULT 0,
aspa_count INTEGER NOT NULL DEFAULT 0,
roa_source TEXT NOT NULL DEFAULT 'Cloudflare',
aspa_source TEXT NOT NULL DEFAULT 'Cloudflare'
);
"#;
pub const RPKI_INDEXES: &'static [&'static str] = &[
"CREATE INDEX IF NOT EXISTS idx_rpki_roa_prefix_range ON rpki_roa(prefix_start, prefix_end)",
"CREATE INDEX IF NOT EXISTS idx_rpki_roa_origin_asn ON rpki_roa(origin_asn)",
"CREATE INDEX IF NOT EXISTS idx_rpki_aspa_customer ON rpki_aspa(customer_asn)",
"CREATE INDEX IF NOT EXISTS idx_rpki_aspa_provider ON rpki_aspa(provider_asn)",
];
pub const ASINFO_CORE_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS asinfo_core (
asn INTEGER PRIMARY KEY,
name TEXT NOT NULL,
country TEXT NOT NULL
);
"#;
pub const ASINFO_AS2ORG_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS asinfo_as2org (
asn INTEGER PRIMARY KEY,
name TEXT NOT NULL,
org_id TEXT NOT NULL,
org_name TEXT NOT NULL,
country TEXT NOT NULL
);
"#;
pub const ASINFO_PEERINGDB_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS asinfo_peeringdb (
asn INTEGER PRIMARY KEY,
name TEXT NOT NULL,
name_long TEXT,
aka TEXT,
website TEXT,
irr_as_set TEXT
);
"#;
pub const ASINFO_HEGEMONY_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS asinfo_hegemony (
asn INTEGER PRIMARY KEY,
ipv4 REAL NOT NULL,
ipv6 REAL NOT NULL
);
"#;
pub const ASINFO_POPULATION_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS asinfo_population (
asn INTEGER PRIMARY KEY,
percent_country REAL NOT NULL,
percent_global REAL NOT NULL,
sample_count INTEGER NOT NULL,
user_count INTEGER NOT NULL
);
"#;
pub const ASINFO_META_TABLE: &'static str = r#"
CREATE TABLE IF NOT EXISTS asinfo_meta (
id INTEGER PRIMARY KEY CHECK (id = 1),
source_url TEXT NOT NULL,
last_updated INTEGER NOT NULL,
core_count INTEGER NOT NULL,
as2org_count INTEGER NOT NULL,
peeringdb_count INTEGER NOT NULL,
hegemony_count INTEGER NOT NULL,
population_count INTEGER NOT NULL
);
"#;
pub const ASINFO_INDEXES: &'static [&'static str] = &[
"CREATE INDEX IF NOT EXISTS idx_asinfo_core_name ON asinfo_core(name)",
"CREATE INDEX IF NOT EXISTS idx_asinfo_core_country ON asinfo_core(country)",
"CREATE INDEX IF NOT EXISTS idx_asinfo_as2org_org_id ON asinfo_as2org(org_id)",
"CREATE INDEX IF NOT EXISTS idx_asinfo_as2org_org_name ON asinfo_as2org(org_name)",
"CREATE INDEX IF NOT EXISTS idx_asinfo_peeringdb_name ON asinfo_peeringdb(name)",
];
}
pub struct SchemaManager<'a> {
conn: &'a Connection,
}
impl<'a> SchemaManager<'a> {
pub fn new(conn: &'a Connection) -> Self {
Self { conn }
}
pub fn initialize(&self) -> Result<()> {
self.conn
.execute(SchemaDefinitions::META_TABLE, [])
.map_err(|e| anyhow!("Failed to create meta table: {}", e))?;
self.set_meta("schema_version", &SCHEMA_VERSION.to_string())?;
self.conn
.execute(SchemaDefinitions::AS2ORG_AS_TABLE, [])
.map_err(|e| anyhow!("Failed to create as2org_as table: {}", e))?;
self.conn
.execute(SchemaDefinitions::AS2ORG_ORG_TABLE, [])
.map_err(|e| anyhow!("Failed to create as2org_org table: {}", e))?;
for index_sql in SchemaDefinitions::AS2ORG_INDEXES {
self.conn
.execute(index_sql, [])
.map_err(|e| anyhow!("Failed to create AS2Org index: {}", e))?;
}
for view_sql in SchemaDefinitions::AS2ORG_VIEWS {
self.conn
.execute(view_sql, [])
.map_err(|e| anyhow!("Failed to create AS2Org view: {}", e))?;
}
self.conn
.execute(SchemaDefinitions::AS2REL_META_TABLE, [])
.map_err(|e| anyhow!("Failed to create as2rel_meta table: {}", e))?;
self.conn
.execute(SchemaDefinitions::AS2REL_TABLE, [])
.map_err(|e| anyhow!("Failed to create as2rel table: {}", e))?;
for index_sql in SchemaDefinitions::AS2REL_INDEXES {
self.conn
.execute(index_sql, [])
.map_err(|e| anyhow!("Failed to create AS2Rel index: {}", e))?;
}
self.conn
.execute(SchemaDefinitions::RPKI_ROA_TABLE, [])
.map_err(|e| anyhow!("Failed to create rpki_roa table: {}", e))?;
self.conn
.execute(SchemaDefinitions::RPKI_ASPA_TABLE, [])
.map_err(|e| anyhow!("Failed to create rpki_aspa table: {}", e))?;
self.conn
.execute(SchemaDefinitions::RPKI_META_TABLE, [])
.map_err(|e| anyhow!("Failed to create rpki_meta table: {}", e))?;
for index_sql in SchemaDefinitions::RPKI_INDEXES {
self.conn
.execute(index_sql, [])
.map_err(|e| anyhow!("Failed to create RPKI index: {}", e))?;
}
self.conn
.execute(SchemaDefinitions::ASINFO_CORE_TABLE, [])
.map_err(|e| anyhow!("Failed to create asinfo_core table: {}", e))?;
self.conn
.execute(SchemaDefinitions::ASINFO_AS2ORG_TABLE, [])
.map_err(|e| anyhow!("Failed to create asinfo_as2org table: {}", e))?;
self.conn
.execute(SchemaDefinitions::ASINFO_PEERINGDB_TABLE, [])
.map_err(|e| anyhow!("Failed to create asinfo_peeringdb table: {}", e))?;
self.conn
.execute(SchemaDefinitions::ASINFO_HEGEMONY_TABLE, [])
.map_err(|e| anyhow!("Failed to create asinfo_hegemony table: {}", e))?;
self.conn
.execute(SchemaDefinitions::ASINFO_POPULATION_TABLE, [])
.map_err(|e| anyhow!("Failed to create asinfo_population table: {}", e))?;
self.conn
.execute(SchemaDefinitions::ASINFO_META_TABLE, [])
.map_err(|e| anyhow!("Failed to create asinfo_meta table: {}", e))?;
for index_sql in SchemaDefinitions::ASINFO_INDEXES {
self.conn
.execute(index_sql, [])
.map_err(|e| anyhow!("Failed to create ASInfo index: {}", e))?;
}
Ok(())
}
pub fn check_status(&self) -> Result<SchemaStatus> {
let meta_exists: i32 = self
.conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='monocle_meta'",
[],
|row| row.get(0),
)
.unwrap_or(0);
if meta_exists == 0 {
return Ok(SchemaStatus::NotInitialized);
}
let current_version = self.get_schema_version()?;
if current_version == SCHEMA_VERSION {
if self.verify_integrity()? {
Ok(SchemaStatus::Current)
} else {
Ok(SchemaStatus::Corrupted)
}
} else if current_version < SCHEMA_VERSION {
Ok(SchemaStatus::NeedsMigration {
from: current_version,
to: SCHEMA_VERSION,
})
} else {
Ok(SchemaStatus::Incompatible {
database_version: current_version,
required_version: SCHEMA_VERSION,
})
}
}
fn get_schema_version(&self) -> Result<u32> {
let version: String = self
.conn
.query_row(
"SELECT value FROM monocle_meta WHERE key = 'schema_version'",
[],
|row| row.get(0),
)
.unwrap_or_else(|_| "0".to_string());
version
.parse()
.map_err(|e| anyhow!("Invalid schema version: {}", e))
}
fn verify_integrity(&self) -> Result<bool> {
let required_tables = [
"monocle_meta",
"as2org_as",
"as2org_org",
"as2rel",
"as2rel_meta",
"rpki_roa",
"rpki_aspa",
"rpki_meta",
"asinfo_core",
"asinfo_as2org",
"asinfo_peeringdb",
"asinfo_hegemony",
"asinfo_population",
"asinfo_meta",
];
for table in required_tables {
let exists: i32 = self
.conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?1",
[table],
|row| row.get(0),
)
.unwrap_or(0);
if exists == 0 {
return Ok(false);
}
}
Ok(true)
}
pub fn set_meta(&self, key: &str, value: &str) -> Result<()> {
self.conn
.execute(
"INSERT OR REPLACE INTO monocle_meta (key, value, updated_at) VALUES (?1, ?2, strftime('%s', 'now'))",
[key, value],
)
.map_err(|e| anyhow!("Failed to set meta value: {}", e))?;
Ok(())
}
pub fn get_meta(&self, key: &str) -> Result<Option<String>> {
let result: Result<String, _> = self.conn.query_row(
"SELECT value FROM monocle_meta WHERE key = ?1",
[key],
|row| row.get(0),
);
match result {
Ok(value) => Ok(Some(value)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(anyhow!("Failed to get meta value: {}", e)),
}
}
pub fn reset(&self) -> Result<()> {
self.conn.execute("DROP VIEW IF EXISTS as2org_all", [])?;
self.conn.execute("DROP VIEW IF EXISTS as2org_count", [])?;
self.conn.execute("DROP VIEW IF EXISTS as2org_both", [])?;
self.conn.execute("DROP TABLE IF EXISTS as2rel", [])?;
self.conn.execute("DROP TABLE IF EXISTS as2rel_meta", [])?;
self.conn.execute("DROP TABLE IF EXISTS as2org_as", [])?;
self.conn.execute("DROP TABLE IF EXISTS as2org_org", [])?;
self.conn.execute("DROP TABLE IF EXISTS rpki_roa", [])?;
self.conn.execute("DROP TABLE IF EXISTS rpki_aspa", [])?;
self.conn.execute("DROP TABLE IF EXISTS rpki_meta", [])?;
self.conn.execute("DROP TABLE IF EXISTS asinfo_core", [])?;
self.conn
.execute("DROP TABLE IF EXISTS asinfo_as2org", [])?;
self.conn
.execute("DROP TABLE IF EXISTS asinfo_peeringdb", [])?;
self.conn
.execute("DROP TABLE IF EXISTS asinfo_hegemony", [])?;
self.conn
.execute("DROP TABLE IF EXISTS asinfo_population", [])?;
self.conn.execute("DROP TABLE IF EXISTS asinfo_meta", [])?;
self.conn.execute("DROP TABLE IF EXISTS monocle_meta", [])?;
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SchemaStatus {
NotInitialized,
Current,
NeedsMigration { from: u32, to: u32 },
Incompatible {
database_version: u32,
required_version: u32,
},
Corrupted,
}
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::Connection;
fn create_test_db() -> Connection {
let conn = Connection::open_in_memory().unwrap();
conn.execute("PRAGMA foreign_keys=ON", []).unwrap();
conn
}
#[test]
fn test_schema_not_initialized() {
let conn = create_test_db();
let manager = SchemaManager::new(&conn);
assert_eq!(
manager.check_status().unwrap(),
SchemaStatus::NotInitialized
);
}
#[test]
fn test_schema_initialize() {
let conn = create_test_db();
let manager = SchemaManager::new(&conn);
manager.initialize().unwrap();
assert_eq!(manager.check_status().unwrap(), SchemaStatus::Current);
}
#[test]
fn test_schema_version() {
let conn = create_test_db();
let manager = SchemaManager::new(&conn);
manager.initialize().unwrap();
let version = manager.get_schema_version().unwrap();
assert_eq!(version, SCHEMA_VERSION);
}
#[test]
fn test_meta_operations() {
let conn = create_test_db();
let manager = SchemaManager::new(&conn);
manager.initialize().unwrap();
manager.set_meta("test_key", "test_value").unwrap();
let value = manager.get_meta("test_key").unwrap();
assert_eq!(value, Some("test_value".to_string()));
let missing = manager.get_meta("nonexistent").unwrap();
assert_eq!(missing, None);
}
#[test]
fn test_schema_reset() {
let conn = create_test_db();
let manager = SchemaManager::new(&conn);
manager.initialize().unwrap();
assert_eq!(manager.check_status().unwrap(), SchemaStatus::Current);
manager.reset().unwrap();
assert_eq!(
manager.check_status().unwrap(),
SchemaStatus::NotInitialized
);
}
}