use super::DistroHandler;
use crate::core::Mirror;
use anyhow::{Context, Result};
use async_trait::async_trait;
use regex::Regex;
use reqwest::Client;
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use tracing::{debug, info, warn};
use url::Url;
pub struct AptHandler {
sources_file: PathBuf,
sources_dir: PathBuf,
backup_dir: PathBuf,
distro_variant: AptVariant,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AptVariant {
Debian,
Ubuntu,
}
#[derive(Debug, Clone)]
struct SourceLine {
enabled: bool,
source_type: String, options: Vec<String>,
url: String,
distribution: String,
components: Vec<String>,
original_line: String,
line_number: usize,
}
impl AptHandler {
pub fn new() -> Self {
let variant = Self::detect_variant();
Self {
sources_file: PathBuf::from("/etc/apt/sources.list"),
sources_dir: PathBuf::from("/etc/apt/sources.list.d"),
backup_dir: PathBuf::from("/var/backups/smirrors/apt"),
distro_variant: variant,
}
}
fn detect_variant() -> AptVariant {
if let Ok(content) = fs::read_to_string("/etc/os-release") {
if content.contains("Ubuntu") || content.contains("ID=ubuntu") {
return AptVariant::Ubuntu;
}
}
if Path::new("/etc/lsb-release").exists() {
if let Ok(content) = fs::read_to_string("/etc/lsb-release") {
if content.contains("Ubuntu") {
return AptVariant::Ubuntu;
}
}
}
AptVariant::Debian
}
fn parse_sources_list(&self) -> Result<Vec<Mirror>> {
let mut mirrors = Vec::new();
if self.sources_file.exists() {
let content = fs::read_to_string(&self.sources_file)?;
mirrors.extend(self.extract_mirrors_from_content(&content)?);
}
if self.sources_dir.exists() {
for entry in fs::read_dir(&self.sources_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("list") {
if let Ok(content) = fs::read_to_string(&path) {
mirrors.extend(self.extract_mirrors_from_content(&content)?);
}
}
}
}
Ok(mirrors)
}
fn extract_mirrors_from_content(&self, content: &str) -> Result<Vec<Mirror>> {
let mut mirrors = Vec::new();
let mut seen_urls = std::collections::HashSet::new();
for source in self.parse_source_lines(content)? {
if !source.enabled {
continue;
}
if source.source_type != "deb" {
continue;
}
if let Ok(url) = Url::parse(&source.url) {
let url_str = url.as_str().trim_end_matches('/');
if !seen_urls.contains(url_str) {
seen_urls.insert(url_str.to_string());
let mut mirror = Mirror::new(url);
if let Some(country) = Self::extract_country_from_hostname(&source.url) {
mirror.country = Some(country);
}
mirror
.metadata
.insert("distribution".to_string(), source.distribution.clone());
mirror.metadata.insert(
"components".to_string(),
source.components.join(" "),
);
mirrors.push(mirror);
}
}
}
Ok(mirrors)
}
fn parse_source_lines(&self, content: &str) -> Result<Vec<SourceLine>> {
let mut sources = Vec::new();
let re = Regex::new(
r"(?x)
^
\s*
(?P<comment>\#?) # Optional comment marker
\s*
(?P<type>deb|deb-src) # Source type
\s+
(?:\[(?P<options>[^\]]+)\]\s+)? # Optional options in brackets
(?P<url>\S+) # URL
\s+
(?P<dist>\S+) # Distribution
(?:\s+(?P<components>.+))? # Optional components
\s*
$
",
)?;
for (line_num, line) in content.lines().enumerate() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Some(caps) = re.captures(line) {
let is_commented = caps.name("comment").map_or(false, |m| !m.as_str().is_empty());
let source_type = caps["type"].to_string();
let url = caps["url"].to_string();
let distribution = caps["dist"].to_string();
let options = caps
.name("options")
.map(|m| {
m.as_str()
.split_whitespace()
.map(|s| s.to_string())
.collect()
})
.unwrap_or_default();
let components = caps
.name("components")
.map(|m| {
m.as_str()
.split_whitespace()
.map(|s| s.to_string())
.collect()
})
.unwrap_or_default();
sources.push(SourceLine {
enabled: !is_commented,
source_type,
options,
url,
distribution,
components,
original_line: line.to_string(),
line_number: line_num,
});
}
}
Ok(sources)
}
fn extract_country_from_hostname(url: &str) -> Option<String> {
if let Ok(parsed_url) = Url::parse(url) {
if let Some(host) = parsed_url.host_str() {
let parts: Vec<&str> = host.split('.').collect();
for part in parts {
if part.len() == 2 && part.chars().all(|c| c.is_ascii_alphabetic()) {
return Some(part.to_uppercase());
}
}
}
}
None
}
async fn fetch_debian_mirrors(&self) -> Result<Vec<Mirror>> {
let client = Client::new();
let url = "https://www.debian.org/mirror/list";
let response = client
.get(url)
.send()
.await
.context("Failed to fetch Debian mirror list")?;
let html = response
.text()
.await
.context("Failed to read Debian mirror list")?;
self.parse_debian_mirror_html(&html)
}
fn parse_debian_mirror_html(&self, html: &str) -> Result<Vec<Mirror>> {
let mut mirrors = Vec::new();
let re = Regex::new(r#"(?:https?://[^\s"<>]+/debian/?)"#)?;
for cap in re.captures_iter(html) {
if let Some(url_str) = cap.get(0) {
let url_str = url_str.as_str().trim_end_matches('/');
if let Ok(url) = Url::parse(url_str) {
let mut mirror = Mirror::new(url.clone());
if let Some(country) = Self::extract_country_from_hostname(url_str) {
mirror.country = Some(country);
}
mirrors.push(mirror);
}
}
}
Ok(mirrors)
}
async fn fetch_ubuntu_mirrors(&self) -> Result<Vec<Mirror>> {
let client = Client::new();
let url = "https://launchpad.net/ubuntu/+archivemirrors-rss";
let response = client
.get(url)
.send()
.await
.context("Failed to fetch Ubuntu mirror list")?;
let content = response
.text()
.await
.context("Failed to read Ubuntu mirror list")?;
self.parse_ubuntu_mirror_list(&content)
}
fn parse_ubuntu_mirror_list(&self, content: &str) -> Result<Vec<Mirror>> {
let mut mirrors = Vec::new();
let re = Regex::new(r#"(?:https?://[^\s"<>]+/ubuntu/?)"#)?;
for cap in re.captures_iter(content) {
if let Some(url_str) = cap.get(0) {
let url_str = url_str.as_str().trim_end_matches('/');
if let Ok(url) = Url::parse(url_str) {
let mut mirror = Mirror::new(url.clone());
if let Some(country) = Self::extract_country_from_hostname(url_str) {
mirror.country = Some(country);
}
mirrors.push(mirror);
}
}
}
if mirrors.is_empty() {
mirrors = self.get_fallback_ubuntu_mirrors();
}
Ok(mirrors)
}
fn get_fallback_ubuntu_mirrors(&self) -> Vec<Mirror> {
let mirror_urls = vec![
"http://archive.ubuntu.com/ubuntu/",
"http://us.archive.ubuntu.com/ubuntu/",
"http://security.ubuntu.com/ubuntu/",
"http://mirrors.kernel.org/ubuntu/",
"http://mirror.math.princeton.edu/pub/ubuntu/",
];
mirror_urls
.into_iter()
.filter_map(|url_str| {
Url::parse(url_str)
.ok()
.map(|url| Mirror::new(url))
})
.collect()
}
fn update_sources_list(&self, mirrors: &[Mirror]) -> Result<()> {
if mirrors.is_empty() {
anyhow::bail!("No mirrors provided for update");
}
let content = fs::read_to_string(&self.sources_file)
.context("Failed to read sources.list")?;
let sources = self.parse_source_lines(&content)?;
let best_mirror = &mirrors[0];
let new_url = best_mirror.url.as_str().trim_end_matches('/');
let mut new_content = String::new();
let mut current_line = 0;
for line in content.lines() {
let mut modified_line = line.to_string();
if let Some(source) = sources.iter().find(|s| s.line_number == current_line) {
if source.enabled && source.source_type == "deb" {
modified_line = Self::replace_url_in_line(line, new_url);
}
}
new_content.push_str(&modified_line);
new_content.push('\n');
current_line += 1;
}
let temp_file = self.sources_file.with_extension("tmp");
let mut file = fs::File::create(&temp_file)
.context("Failed to create temporary sources.list")?;
file.write_all(new_content.as_bytes())
.context("Failed to write temporary sources.list")?;
file.sync_all()
.context("Failed to sync temporary sources.list")?;
drop(file);
fs::rename(&temp_file, &self.sources_file)
.context("Failed to replace sources.list")?;
info!("Updated sources.list with new mirror: {}", new_url);
Ok(())
}
fn replace_url_in_line(line: &str, new_url: &str) -> String {
let re = Regex::new(
r"(?x)
(^\s*(?:\#\s*)?(?:deb|deb-src)\s+(?:\[[^\]]+\]\s+)?) # Prefix
(\S+) # URL to replace
(\s+.*) # Suffix
",
)
.unwrap();
if let Some(caps) = re.captures(line) {
format!("{}{}{}", &caps[1], new_url, &caps[3])
} else {
line.to_string()
}
}
fn get_latest_backup(&self) -> Result<PathBuf> {
if !self.backup_dir.exists() {
anyhow::bail!("No backups found");
}
let mut backups: Vec<_> = fs::read_dir(&self.backup_dir)?
.filter_map(|e| e.ok())
.filter(|e| {
e.path()
.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with("sources.list."))
.unwrap_or(false)
})
.collect();
backups.sort_by_key(|e| e.metadata().and_then(|m| m.modified()).ok());
backups
.last()
.map(|e| e.path())
.ok_or_else(|| anyhow::anyhow!("No backup files found"))
}
}
impl Default for AptHandler {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl DistroHandler for AptHandler {
fn name(&self) -> &str {
match self.distro_variant {
AptVariant::Debian => "Debian APT",
AptVariant::Ubuntu => "Ubuntu APT",
}
}
fn detect(&self) -> bool {
self.sources_file.exists()
&& std::process::Command::new("apt")
.arg("--version")
.output()
.is_ok()
}
async fn get_available_mirrors(&self) -> Result<Vec<Mirror>> {
debug!("Fetching available mirrors for APT");
match self.distro_variant {
AptVariant::Debian => self.fetch_debian_mirrors().await,
AptVariant::Ubuntu => self.fetch_ubuntu_mirrors().await,
}
}
fn get_current_mirrors(&self) -> Result<Vec<Mirror>> {
debug!("Reading current mirrors from sources.list");
self.parse_sources_list()
}
fn update_mirrors(&self, mirrors: &[Mirror]) -> Result<()> {
info!("Updating APT mirrors");
if !nix::unistd::geteuid().is_root() {
anyhow::bail!("Root privileges required to update APT sources");
}
self.backup()?;
self.update_sources_list(mirrors)?;
if !self.validate()? {
warn!("Validation failed, restoring backup");
self.restore_backup()?;
anyhow::bail!("Mirror update validation failed");
}
info!("Successfully updated APT mirrors");
Ok(())
}
fn backup(&self) -> Result<()> {
debug!("Creating backup of sources.list");
fs::create_dir_all(&self.backup_dir)
.context("Failed to create backup directory")?;
let timestamp = chrono::Utc::now().timestamp();
let backup_file = self.backup_dir.join(format!("sources.list.{}", timestamp));
fs::copy(&self.sources_file, &backup_file)
.context("Failed to copy sources.list to backup")?;
info!("Created backup: {:?}", backup_file);
let mut backups: Vec<_> = fs::read_dir(&self.backup_dir)?
.filter_map(|e| e.ok())
.collect();
if backups.len() > 10 {
backups.sort_by_key(|e| e.metadata().and_then(|m| m.modified()).ok());
for entry in backups.iter().take(backups.len() - 10) {
let _ = fs::remove_file(entry.path());
}
}
Ok(())
}
fn restore_backup(&self) -> Result<()> {
info!("Restoring sources.list from backup");
let backup_file = self.get_latest_backup()?;
fs::copy(&backup_file, &self.sources_file)
.context("Failed to restore sources.list from backup")?;
info!("Restored from backup: {:?}", backup_file);
Ok(())
}
fn validate(&self) -> Result<bool> {
debug!("Validating APT configuration");
let output = std::process::Command::new("apt-cache")
.arg("policy")
.output()
.context("Failed to run apt-cache policy")?;
if !output.status.success() {
warn!("apt-cache policy failed: {}", String::from_utf8_lossy(&output.stderr));
return Ok(false);
}
match self.parse_sources_list() {
Ok(mirrors) if !mirrors.is_empty() => {
debug!("Validation successful, found {} mirrors", mirrors.len());
Ok(true)
}
Ok(_) => {
warn!("Validation found no mirrors");
Ok(false)
}
Err(e) => {
warn!("Validation failed to parse sources: {}", e);
Ok(false)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_source_line_simple() {
let handler = AptHandler::new();
let content = "deb http://archive.ubuntu.com/ubuntu/ jammy main restricted";
let sources = handler.parse_source_lines(content).unwrap();
assert_eq!(sources.len(), 1);
let source = &sources[0];
assert!(source.enabled);
assert_eq!(source.source_type, "deb");
assert_eq!(source.url, "http://archive.ubuntu.com/ubuntu/");
assert_eq!(source.distribution, "jammy");
assert_eq!(source.components, vec!["main", "restricted"]);
}
#[test]
fn test_parse_source_line_with_options() {
let handler = AptHandler::new();
let content = "deb [arch=amd64,arm64] http://example.com/ubuntu jammy main";
let sources = handler.parse_source_lines(content).unwrap();
assert_eq!(sources.len(), 1);
let source = &sources[0];
assert!(source.enabled);
assert!(!source.options.is_empty());
assert_eq!(source.url, "http://example.com/ubuntu");
}
#[test]
fn test_parse_commented_line() {
let handler = AptHandler::new();
let content = "# deb http://example.com/ubuntu jammy main";
let sources = handler.parse_source_lines(content).unwrap();
assert_eq!(sources.len(), 1);
let source = &sources[0];
assert!(!source.enabled);
}
#[test]
fn test_replace_url_in_line() {
let line = "deb http://old.com/ubuntu/ jammy main";
let new_url = "http://new.com/ubuntu";
let result = AptHandler::replace_url_in_line(line, new_url);
assert!(result.contains("http://new.com/ubuntu"));
assert!(result.contains("jammy main"));
}
#[test]
fn test_extract_country_from_hostname() {
assert_eq!(
AptHandler::extract_country_from_hostname("http://us.mirror.example.com/"),
Some("US".to_string())
);
assert_eq!(
AptHandler::extract_country_from_hostname("http://mirror.uk.example.com/"),
Some("UK".to_string())
);
}
}