use crate::core::{Mirror, TestResult};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use rusqlite::{params, Connection, OptionalExtension, Row};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::{debug, info};
use url::Url;
#[derive(Clone)]
pub struct Database {
conn: Arc<Mutex<Connection>>,
path: PathBuf,
}
#[derive(Debug, Clone)]
pub struct UpdateRecord {
pub id: i64,
pub mirrors_changed: i64,
pub success: bool,
pub error: Option<String>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, Default)]
pub struct TestResultFilter {
pub mirror_url: Option<String>,
pub success_only: bool,
pub since: Option<DateTime<Utc>>,
pub until: Option<DateTime<Utc>>,
pub limit: Option<usize>,
}
#[derive(Debug, Clone, Default)]
pub struct MirrorFilter {
pub static_only: bool,
pub tested_only: bool,
pub country: Option<String>,
pub min_score: Option<f64>,
}
impl Database {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref().to_path_buf();
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("Failed to create database directory: {:?}", parent))?;
}
let conn = Connection::open(&path)
.with_context(|| format!("Failed to open database at {:?}", path))?;
let db = Self {
conn: Arc::new(Mutex::new(conn)),
path,
};
db.init_schema()?;
info!("Database initialized at {:?}", db.path);
Ok(db)
}
pub fn in_memory() -> Result<Self> {
let conn = Connection::open_in_memory()
.context("Failed to open in-memory database")?;
let db = Self {
conn: Arc::new(Mutex::new(conn)),
path: PathBuf::from(":memory:"),
};
db.init_schema()?;
debug!("In-memory database initialized");
Ok(db)
}
fn init_schema(&self) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute_batch(
r#"
-- Mirrors table
CREATE TABLE IF NOT EXISTS mirrors (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL UNIQUE,
country TEXT,
is_static INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Test results table
CREATE TABLE IF NOT EXISTS test_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mirror_id INTEGER NOT NULL,
speed REAL,
latency INTEGER,
score REAL,
success INTEGER NOT NULL,
error TEXT,
tested_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (mirror_id) REFERENCES mirrors (id) ON DELETE CASCADE
);
-- Update history table
CREATE TABLE IF NOT EXISTS updates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
mirrors_changed INTEGER NOT NULL,
success INTEGER NOT NULL,
error TEXT,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_test_results_mirror_id
ON test_results(mirror_id);
CREATE INDEX IF NOT EXISTS idx_test_results_tested_at
ON test_results(tested_at DESC);
CREATE INDEX IF NOT EXISTS idx_mirrors_url
ON mirrors(url);
CREATE INDEX IF NOT EXISTS idx_mirrors_is_static
ON mirrors(is_static);
CREATE INDEX IF NOT EXISTS idx_updates_updated_at
ON updates(updated_at DESC);
"#
)
.context("Failed to initialize database schema")?;
debug!("Database schema initialized successfully");
Ok(())
}
pub fn save_mirror(&self, mirror: &Mirror) -> Result<i64> {
let conn = self.conn.lock().unwrap();
let url = mirror.url.to_string();
conn.execute(
r#"
INSERT INTO mirrors (url, country, is_static)
VALUES (?1, ?2, ?3)
ON CONFLICT(url) DO UPDATE SET
country = excluded.country,
is_static = excluded.is_static
"#,
params![
url,
mirror.country.as_deref(),
mirror.is_static as i32,
],
)
.with_context(|| format!("Failed to save mirror: {}", url))?;
let id = conn.last_insert_rowid();
debug!("Saved mirror {} with ID {}", url, id);
Ok(id)
}
pub fn get_mirror(&self, url: &str) -> Result<Option<Mirror>> {
let conn = self.conn.lock().unwrap();
let result = conn
.query_row(
"SELECT url, country, is_static FROM mirrors WHERE url = ?1",
params![url],
|row| {
let url_str: String = row.get(0)?;
let country: Option<String> = row.get(1)?;
let is_static: i32 = row.get(2)?;
Ok((url_str, country, is_static))
},
)
.optional()
.context("Failed to query mirror")?;
if let Some((url_str, country, is_static)) = result {
let url = Url::parse(&url_str)
.with_context(|| format!("Invalid URL in database: {}", url_str))?;
let mut mirror = Mirror::new(url);
mirror.country = country;
mirror.is_static = is_static != 0;
self.populate_mirror_stats(&conn, &mut mirror)?;
Ok(Some(mirror))
} else {
Ok(None)
}
}
fn get_mirror_id(&self, conn: &Connection, url: &str) -> Result<Option<i64>> {
conn.query_row(
"SELECT id FROM mirrors WHERE url = ?1",
params![url],
|row| row.get(0),
)
.optional()
.context("Failed to get mirror ID")
}
fn populate_mirror_stats(&self, conn: &Connection, mirror: &mut Mirror) -> Result<()> {
let url = mirror.url.to_string();
let result = conn
.query_row(
r#"
SELECT speed, latency, score, tested_at
FROM test_results
WHERE mirror_id = (SELECT id FROM mirrors WHERE url = ?1)
AND success = 1
ORDER BY tested_at DESC
LIMIT 1
"#,
params![url],
|row| {
let speed: Option<f64> = row.get(0)?;
let latency_ms: Option<i64> = row.get(1)?;
let score: Option<f64> = row.get(2)?;
let tested_at: String = row.get(3)?;
Ok((speed, latency_ms, score, tested_at))
},
)
.optional()
.context("Failed to query mirror stats")?;
if let Some((speed, latency_ms, score, tested_at_str)) = result {
mirror.speed = speed;
mirror.latency = latency_ms.map(|ms| Duration::from_millis(ms as u64));
mirror.score = score;
if let Ok(tested_at) = DateTime::parse_from_rfc3339(&tested_at_str) {
mirror.last_tested = Some(tested_at.with_timezone(&Utc));
}
}
Ok(())
}
pub fn save_test_result(&self, result: &TestResult) -> Result<()> {
let conn = self.conn.lock().unwrap();
let url = result.mirror.url.to_string();
let mirror_id = match self.get_mirror_id(&conn, &url)? {
Some(id) => id,
None => {
conn.execute(
"INSERT INTO mirrors (url, country, is_static) VALUES (?1, ?2, ?3)",
params![
url,
result.mirror.country.as_deref(),
result.mirror.is_static as i32,
],
)
.context("Failed to insert mirror for test result")?;
conn.last_insert_rowid()
}
};
conn.execute(
r#"
INSERT INTO test_results
(mirror_id, speed, latency, score, success, error, tested_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
"#,
params![
mirror_id,
result.speed,
result.latency.map(|d| d.as_millis() as i64),
result.score,
result.success as i32,
result.error.as_deref(),
result.tested_at.to_rfc3339(),
],
)
.with_context(|| format!("Failed to save test result for mirror: {}", url))?;
debug!("Saved test result for mirror: {}", url);
Ok(())
}
pub fn get_test_results(&self, filter: &TestResultFilter) -> Result<Vec<TestResult>> {
let conn = self.conn.lock().unwrap();
let mut query = String::from(
r#"
SELECT
m.url, m.country, m.is_static,
tr.speed, tr.latency, tr.score,
tr.success, tr.error, tr.tested_at
FROM test_results tr
JOIN mirrors m ON tr.mirror_id = m.id
WHERE 1=1
"#,
);
let mut conditions = Vec::new();
if filter.mirror_url.is_some() {
conditions.push("m.url = ?");
}
if filter.success_only {
conditions.push("tr.success = 1");
}
if filter.since.is_some() {
conditions.push("tr.tested_at >= ?");
}
if filter.until.is_some() {
conditions.push("tr.tested_at <= ?");
}
for condition in conditions {
query.push_str(" AND ");
query.push_str(condition);
}
query.push_str(" ORDER BY tr.tested_at DESC");
if let Some(limit) = filter.limit {
query.push_str(&format!(" LIMIT {}", limit));
}
let mut stmt = conn.prepare(&query)
.context("Failed to prepare test results query")?;
let mut dynamic_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
if let Some(ref url) = filter.mirror_url {
dynamic_params.push(Box::new(url.clone()));
}
if let Some(since) = filter.since {
dynamic_params.push(Box::new(since.to_rfc3339()));
}
if let Some(until) = filter.until {
dynamic_params.push(Box::new(until.to_rfc3339()));
}
let params_refs: Vec<&dyn rusqlite::ToSql> = dynamic_params
.iter()
.map(|p| p.as_ref())
.collect();
let results = stmt
.query_map(params_refs.as_slice(), Self::row_to_test_result)?
.collect::<std::result::Result<Vec<_>, _>>()
.context("Failed to map test results")?;
debug!("Retrieved {} test results", results.len());
Ok(results)
}
fn row_to_test_result(row: &Row) -> rusqlite::Result<TestResult> {
let url_str: String = row.get(0)?;
let country: Option<String> = row.get(1)?;
let is_static: i32 = row.get(2)?;
let speed: Option<f64> = row.get(3)?;
let latency_ms: Option<i64> = row.get(4)?;
let score: Option<f64> = row.get(5)?;
let success: i32 = row.get(6)?;
let error: Option<String> = row.get(7)?;
let tested_at_str: String = row.get(8)?;
let url = Url::parse(&url_str)
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
let mut mirror = Mirror::new(url);
mirror.country = country;
mirror.is_static = is_static != 0;
mirror.speed = speed;
mirror.latency = latency_ms.map(|ms| Duration::from_millis(ms as u64));
mirror.score = score;
let tested_at = DateTime::parse_from_rfc3339(&tested_at_str)
.map(|dt| dt.with_timezone(&Utc))
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
Ok(TestResult {
mirror,
success: success != 0,
speed,
latency: latency_ms.map(|ms| Duration::from_millis(ms as u64)),
score,
error,
tested_at,
})
}
pub fn save_update_record(
&self,
mirrors_changed: i64,
success: bool,
error: Option<String>,
) -> Result<i64> {
let conn = self.conn.lock().unwrap();
conn.execute(
r#"
INSERT INTO updates (mirrors_changed, success, error, updated_at)
VALUES (?1, ?2, ?3, ?4)
"#,
params![
mirrors_changed,
success as i32,
error.as_deref(),
Utc::now().to_rfc3339(),
],
)
.context("Failed to save update record")?;
let id = conn.last_insert_rowid();
info!(
"Saved update record: {} mirrors changed, success: {}",
mirrors_changed, success
);
Ok(id)
}
pub fn get_history(&self, limit: usize) -> Result<Vec<UpdateRecord>> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
r#"
SELECT id, mirrors_changed, success, error, updated_at
FROM updates
ORDER BY updated_at DESC
LIMIT ?1
"#,
)
.context("Failed to prepare history query")?;
let records = stmt
.query_map(params![limit], |row| {
let id: i64 = row.get(0)?;
let mirrors_changed: i64 = row.get(1)?;
let success: i32 = row.get(2)?;
let error: Option<String> = row.get(3)?;
let updated_at_str: String = row.get(4)?;
let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)
.map(|dt| dt.with_timezone(&Utc))
.map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
Ok(UpdateRecord {
id,
mirrors_changed,
success: success != 0,
error,
updated_at,
})
})?
.collect::<std::result::Result<Vec<_>, _>>()
.context("Failed to map update records")?;
debug!("Retrieved {} update records", records.len());
Ok(records)
}
pub fn get_mirrors(&self, filter: &MirrorFilter) -> Result<Vec<Mirror>> {
let conn = self.conn.lock().unwrap();
let mut query = String::from("SELECT DISTINCT m.url, m.country, m.is_static FROM mirrors m");
let mut conditions = Vec::new();
if filter.tested_only {
query.push_str(" JOIN test_results tr ON m.id = tr.mirror_id");
conditions.push("tr.success = 1");
}
if filter.static_only {
conditions.push("m.is_static = 1");
}
if !conditions.is_empty() {
query.push_str(" WHERE ");
query.push_str(&conditions.join(" AND "));
}
query.push_str(" ORDER BY m.url");
let mut stmt = conn.prepare(&query)
.context("Failed to prepare mirrors query")?;
let mirrors = stmt
.query_map([], |row| {
let url_str: String = row.get(0)?;
let country: Option<String> = row.get(1)?;
let is_static: i32 = row.get(2)?;
Ok((url_str, country, is_static))
})?
.collect::<std::result::Result<Vec<_>, _>>()
.context("Failed to map mirrors")?;
let mut result = Vec::new();
for (url_str, country, is_static) in mirrors {
let url = Url::parse(&url_str)
.with_context(|| format!("Invalid URL in database: {}", url_str))?;
let mut mirror = Mirror::new(url);
mirror.country = country;
mirror.is_static = is_static != 0;
self.populate_mirror_stats(&conn, &mut mirror)?;
if let Some(min_score) = filter.min_score {
if let Some(score) = mirror.score {
if score < min_score {
continue;
}
} else {
continue;
}
}
if let Some(ref filter_country) = filter.country {
if mirror.country.as_ref() != Some(filter_country) {
continue;
}
}
result.push(mirror);
}
debug!("Retrieved {} mirrors", result.len());
Ok(result)
}
pub fn delete_old_results(&self, older_than_days: i64) -> Result<usize> {
let conn = self.conn.lock().unwrap();
let cutoff = Utc::now() - chrono::Duration::days(older_than_days);
let deleted = conn.execute(
"DELETE FROM test_results WHERE tested_at < ?1",
params![cutoff.to_rfc3339()],
)
.context("Failed to delete old test results")?;
if deleted > 0 {
info!("Deleted {} old test results", deleted);
}
Ok(deleted)
}
pub fn get_latest_update(&self) -> Result<Option<UpdateRecord>> {
let conn = self.conn.lock().unwrap();
let result = conn
.query_row(
r#"
SELECT id, mirrors_changed, success, error, updated_at
FROM updates
ORDER BY updated_at DESC
LIMIT 1
"#,
[],
|row| {
let id: i64 = row.get(0)?;
let mirrors_changed: i64 = row.get(1)?;
let success: i32 = row.get(2)?;
let error: Option<String> = row.get(3)?;
let updated_at_str: String = row.get(4)?;
Ok((id, mirrors_changed, success, error, updated_at_str))
},
)
.optional()
.context("Failed to query latest update")?;
if let Some((id, mirrors_changed, success, error, updated_at_str)) = result {
let updated_at = DateTime::parse_from_rfc3339(&updated_at_str)
.map(|dt| dt.with_timezone(&Utc))
.context("Failed to parse update timestamp")?;
Ok(Some(UpdateRecord {
id,
mirrors_changed,
success: success != 0,
error,
updated_at,
}))
} else {
Ok(None)
}
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn vacuum(&self) -> Result<()> {
let conn = self.conn.lock().unwrap();
conn.execute_batch("VACUUM")
.context("Failed to vacuum database")?;
info!("Database vacuumed successfully");
Ok(())
}
pub fn get_stats(&self) -> Result<DatabaseStats> {
let conn = self.conn.lock().unwrap();
let mirrors_count: i64 = conn.query_row(
"SELECT COUNT(*) FROM mirrors",
[],
|row| row.get(0),
)?;
let test_results_count: i64 = conn.query_row(
"SELECT COUNT(*) FROM test_results",
[],
|row| row.get(0),
)?;
let updates_count: i64 = conn.query_row(
"SELECT COUNT(*) FROM updates",
[],
|row| row.get(0),
)?;
let static_mirrors_count: i64 = conn.query_row(
"SELECT COUNT(*) FROM mirrors WHERE is_static = 1",
[],
|row| row.get(0),
)?;
Ok(DatabaseStats {
mirrors_count: mirrors_count as usize,
test_results_count: test_results_count as usize,
updates_count: updates_count as usize,
static_mirrors_count: static_mirrors_count as usize,
})
}
}
#[derive(Debug, Clone)]
pub struct DatabaseStats {
pub mirrors_count: usize,
pub test_results_count: usize,
pub updates_count: usize,
pub static_mirrors_count: usize,
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_mirror(url: &str) -> Mirror {
let url = Url::parse(url).unwrap();
Mirror::new(url)
}
#[test]
fn test_database_creation() {
let db = Database::in_memory().unwrap();
assert_eq!(db.path(), Path::new(":memory:"));
}
#[test]
fn test_save_and_get_mirror() {
let db = Database::in_memory().unwrap();
let mirror = create_test_mirror("https://mirror.example.com/debian");
db.save_mirror(&mirror).unwrap();
let retrieved = db.get_mirror("https://mirror.example.com/debian").unwrap();
assert!(retrieved.is_some());
let retrieved = retrieved.unwrap();
assert_eq!(retrieved.url, mirror.url);
}
#[test]
fn test_save_static_mirror() {
let db = Database::in_memory().unwrap();
let url = Url::parse("https://static.example.com/repo").unwrap();
let mirror = Mirror::new_static(url);
db.save_mirror(&mirror).unwrap();
let retrieved = db.get_mirror("https://static.example.com/repo").unwrap().unwrap();
assert!(retrieved.is_static);
}
#[test]
fn test_save_test_result() {
let db = Database::in_memory().unwrap();
let mirror = create_test_mirror("https://mirror.example.com/debian");
let result = TestResult::success(
mirror,
50.0,
Duration::from_millis(100),
0.75,
);
db.save_test_result(&result).unwrap();
let filter = TestResultFilter {
mirror_url: Some("https://mirror.example.com/debian".to_string()),
..Default::default()
};
let results = db.get_test_results(&filter).unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].success);
assert_eq!(results[0].speed, Some(50.0));
}
#[test]
fn test_get_test_results_with_filter() {
let db = Database::in_memory().unwrap();
let mirror1 = create_test_mirror("https://mirror1.example.com/repo");
let mirror2 = create_test_mirror("https://mirror2.example.com/repo");
db.save_test_result(&TestResult::success(mirror1, 50.0, Duration::from_millis(100), 0.75)).unwrap();
db.save_test_result(&TestResult::failure(mirror2, "Test error".to_string())).unwrap();
let filter = TestResultFilter {
success_only: true,
..Default::default()
};
let results = db.get_test_results(&filter).unwrap();
assert_eq!(results.len(), 1);
assert!(results[0].success);
}
#[test]
fn test_save_update_record() {
let db = Database::in_memory().unwrap();
db.save_update_record(5, true, None).unwrap();
let history = db.get_history(10).unwrap();
assert_eq!(history.len(), 1);
assert_eq!(history[0].mirrors_changed, 5);
assert!(history[0].success);
}
#[test]
fn test_get_latest_update() {
let db = Database::in_memory().unwrap();
db.save_update_record(3, true, None).unwrap();
std::thread::sleep(Duration::from_millis(10));
db.save_update_record(5, true, None).unwrap();
let latest = db.get_latest_update().unwrap();
assert!(latest.is_some());
assert_eq!(latest.unwrap().mirrors_changed, 5);
}
#[test]
fn test_get_mirrors_with_filter() {
let db = Database::in_memory().unwrap();
let mirror1 = create_test_mirror("https://mirror1.example.com/repo");
let url2 = Url::parse("https://mirror2.example.com/repo").unwrap();
let mirror2 = Mirror::new_static(url2);
db.save_mirror(&mirror1).unwrap();
db.save_mirror(&mirror2).unwrap();
let filter = MirrorFilter {
static_only: true,
..Default::default()
};
let mirrors = db.get_mirrors(&filter).unwrap();
assert_eq!(mirrors.len(), 1);
assert!(mirrors[0].is_static);
}
#[test]
fn test_delete_old_results() {
let db = Database::in_memory().unwrap();
let mirror = create_test_mirror("https://mirror.example.com/repo");
db.save_test_result(&TestResult::success(mirror, 50.0, Duration::from_millis(100), 0.75)).unwrap();
let deleted = db.delete_old_results(0).unwrap();
assert!(deleted > 0);
let results = db.get_test_results(&TestResultFilter::default()).unwrap();
assert_eq!(results.len(), 0);
}
#[test]
fn test_database_stats() {
let db = Database::in_memory().unwrap();
let mirror1 = create_test_mirror("https://mirror1.example.com/repo");
let mirror2 = create_test_mirror("https://mirror2.example.com/repo");
db.save_mirror(&mirror1).unwrap();
db.save_mirror(&mirror2).unwrap();
db.save_test_result(&TestResult::success(mirror1, 50.0, Duration::from_millis(100), 0.75)).unwrap();
db.save_update_record(2, true, None).unwrap();
let stats = db.get_stats().unwrap();
assert_eq!(stats.mirrors_count, 2);
assert_eq!(stats.test_results_count, 1);
assert_eq!(stats.updates_count, 1);
}
#[test]
fn test_populate_mirror_stats() {
let db = Database::in_memory().unwrap();
let mirror = create_test_mirror("https://mirror.example.com/repo");
db.save_test_result(&TestResult::success(
mirror.clone(),
75.5,
Duration::from_millis(50),
0.85,
)).unwrap();
let retrieved = db.get_mirror("https://mirror.example.com/repo").unwrap().unwrap();
assert_eq!(retrieved.speed, Some(75.5));
assert_eq!(retrieved.latency, Some(Duration::from_millis(50)));
assert_eq!(retrieved.score, Some(0.85));
assert!(retrieved.last_tested.is_some());
}
}