borgflux 0.1.0

BorgBackup and InfluxDB combined
Documentation
use config::{Config, ConfigError};
use reqwest::header::AUTHORIZATION;
use serde::Deserialize;
use serde_json::Value;
use std::error::Error;
use std::fmt;
use std::process::{Command, Stdio};
use std::time::SystemTime;

#[derive(Debug, Deserialize)]
struct BorgFluxConfig {
    influx_url: String,
    influx_token: String,
    influx_org: String,
    influx_bucket: String,
    hostname: String,
    borg_repository: String,
    borg_source_path: String,
}

impl BorgFluxConfig {
    pub fn new(file_path: &str) -> Result<Self, ConfigError> {
        let config = Config::builder()
            .add_source(config::File::with_name(file_path))
            .build()?;

        config.try_deserialize()
    }
}

struct InfluxTag {
    name: String,
    value: String,
}

struct InfluxField {
    name: String,
    value: InfluxFieldValue,
}

enum InfluxFieldValue {
    Float(f64),
    Int(i64),
    String(String),
}

struct InfluxPoint {
    measurement: String,
    tags: Vec<InfluxTag>,
    fields: Vec<InfluxField>,
}

impl fmt::Display for InfluxFieldValue {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Float(v) => write!(f, "{}", v),
            Self::Int(v) => write!(f, "{}", v),
            Self::String(v) => write!(f, "{}", v),
        }
    }
}

struct Backup {
    host: String,
    backup_name: String,
    encryption: String,
    repo_location: String,
    duration: f64,
    compressed_size: i64,
    deduplicated_size: i64,
    number_of_files: i64,
    original_size: i64,
}

pub fn run_borgflux(file_path: &str) {
    let env_output = read_config_file(file_path);

    if env_output.is_err() {
        eprintln!("Error: {}", env_output.err().unwrap());
        return;
    }

    let credentials = env_output.unwrap();

    send_frame_point(&credentials, "backup_start".to_string());

    let json_output = run_borg_backup(&credentials.borg_repository, &credentials.borg_source_path);

    if json_output.is_err() {
        send_error_point(&credentials, &json_output.to_owned().unwrap_err());
        return;
    }

    let json_value = read_borg_json_output(json_output.to_owned().unwrap());

    if json_value.is_err() {
        send_error_point(&credentials, "Couldn't read the JSON file correctly!");
        return;
    }

    let backup_stats = extract_data_from_json(json_value.unwrap(), credentials.hostname.to_owned());
    let influx_backup_point = create_influx_point_from_backup(backup_stats);

    write_to_influx(influx_backup_point, &credentials);

    send_frame_point(&credentials, "backup_end".to_string());
}

fn read_config_file(file_path: &str) -> Result<BorgFluxConfig, String> {
    let result = BorgFluxConfig::new(file_path);

    match result {
        Ok(config) => Ok(config),
        Err(_) => Err("Couldn't read the configuration file!".to_string()),
    }
}

fn read_borg_json_output(json_output: String) -> Result<Value, Box<dyn Error>> {
    let json_value: Value = serde_json::from_str(&json_output)?;

    return Ok(json_value);
}

fn extract_data_from_json(json_data: Value, host: String) -> Backup {
    Backup {
        host,
        backup_name: json_data["archive"]["name"].to_string(),
        encryption: json_data["encryption"]["mode"].to_string(),
        repo_location: json_data["repository"]["location"].to_string(),
        duration: json_data["archive"]["duration"].as_f64().unwrap(),
        compressed_size: json_data["archive"]["stats"]["compressed_size"]
            .as_i64()
            .unwrap(),
        deduplicated_size: json_data["archive"]["stats"]["deduplicated_size"]
            .as_i64()
            .unwrap(),
        number_of_files: json_data["archive"]["stats"]["nfiles"].as_i64().unwrap(),
        original_size: json_data["archive"]["stats"]["original_size"]
            .as_i64()
            .unwrap(),
    }
}

fn create_influx_point_from_backup(backup: Backup) -> InfluxPoint {
    let tags: Vec<InfluxTag> = vec![
        InfluxTag {
            name: "host".to_string(),
            value: backup.host,
        },
        InfluxTag {
            name: "backup_name".to_string(),
            value: backup.backup_name,
        },
        InfluxTag {
            name: "encryption".to_string(),
            value: backup.encryption,
        },
        InfluxTag {
            name: "repo_location".to_string(),
            value: backup.repo_location,
        },
    ];
    let fields: Vec<InfluxField> = vec![
        InfluxField {
            name: "duration".to_string(),
            value: InfluxFieldValue::Float(backup.duration),
        },
        InfluxField {
            name: "compressed_size".to_string(),
            value: InfluxFieldValue::Int(backup.compressed_size),
        },
        InfluxField {
            name: "deduplicated_size".to_string(),
            value: InfluxFieldValue::Int(backup.deduplicated_size),
        },
        InfluxField {
            name: "number_of_files".to_string(),
            value: InfluxFieldValue::Int(backup.number_of_files),
        },
        InfluxField {
            name: "original_size".to_string(),
            value: InfluxFieldValue::Int(backup.original_size),
        },
    ];

    InfluxPoint {
        measurement: "backup_data".to_string(),
        tags,
        fields,
    }
}

fn send_frame_point(credentials: &BorgFluxConfig, measurement: String) {
    let tags: Vec<InfluxTag> = vec![
        InfluxTag {
            name: "host".to_string(),
            value: credentials.hostname.to_owned(),
        },
        InfluxTag {
            name: "repository".to_string(),
            value: credentials.borg_repository.to_owned(),
        },
        InfluxTag {
            name: "source_path".to_string(),
            value: credentials.borg_source_path.to_owned(),
        },
    ];

    let fields: Vec<InfluxField> = vec![InfluxField {
        name: "dummy".to_string(),
        value: InfluxFieldValue::Int(0),
    }];

    let point = InfluxPoint {
        measurement,
        tags,
        fields,
    };

    write_to_influx(point, credentials);
}

fn send_error_point(credentials: &BorgFluxConfig, error_text: &str) {
    let tags: Vec<InfluxTag> = vec![
        InfluxTag {
            name: "host".to_string(),
            value: credentials.hostname.to_owned(),
        },
        InfluxTag {
            name: "repository".to_string(),
            value: credentials.borg_repository.to_owned(),
        },
        InfluxTag {
            name: "source_path".to_string(),
            value: credentials.borg_source_path.to_owned(),
        },
    ];

    let fields: Vec<InfluxField> = vec![InfluxField {
        name: "error".to_string(),
        value: InfluxFieldValue::String(error_text.to_string()),
    }];

    let point = InfluxPoint {
        measurement: "backup_error".to_string(),
        tags,
        fields,
    };

    eprintln!("Error: {}", error_text);
    write_to_influx(point, credentials);
}

fn build_raw_data_from_point(point: InfluxPoint) -> String {
    let mut raw_data = point.measurement;

    for tag in point.tags {
        raw_data = format!("{},{}={}", raw_data, tag.name, tag.value);
    }

    if point.fields.len() != 0 {
        raw_data = format!("{} ", raw_data);
    }

    let mut counter = 1;
    let number_of_fields = point.fields.len().to_owned();

    for field in point.fields {
        raw_data = format!("{}{}={}", raw_data, field.name, field.value);

        if counter < number_of_fields {
            raw_data = format!("{},", raw_data);
        }

        counter += 1;
    }

    let timestamp = SystemTime::now()
        .duration_since(SystemTime::UNIX_EPOCH)
        .unwrap()
        .as_secs();

    format!("{} {}", raw_data, timestamp)
}

fn write_to_influx(point: InfluxPoint, credentials: &BorgFluxConfig) {
    let client = reqwest::blocking::Client::new();
    let body = build_raw_data_from_point(point);
    let api_url = format!(
        "{}/api/v2/write?bucket={}&org={}&precision=s",
        credentials.influx_url.to_owned(),
        credentials.influx_bucket.to_owned(),
        credentials.influx_org.to_owned()
    );

    let result = client
        .post(api_url)
        .body(body)
        .header(
            AUTHORIZATION,
            "Token ".to_owned() + &credentials.influx_token.to_owned(),
        )
        .send();

    if result.is_err() {
        send_error_point(&credentials, "Unable to send data to InfluxDB!");
    }
}

fn run_borg_backup(repository: &String, source_path: &String) -> Result<String, String> {
    let is_borg_installed = Command::new("which")
        .stdout(Stdio::null())
        .arg("borg")
        .status();

    if is_borg_installed.is_err() || !is_borg_installed.unwrap().success() {
        return Err("BorgBackup is not installed!".to_string());
    }

    let timestamp = SystemTime::now()
        .duration_since(SystemTime::UNIX_EPOCH)
        .unwrap()
        .as_secs()
        .to_string();

    let backup_repo = format!("{}::{}", repository.to_owned(), timestamp);

    let output = Command::new("borg")
        .arg("create")
        .arg("-v")
        .arg("--json")
        .arg(backup_repo)
        .arg(source_path)
        .output();

    match output {
        Ok(message) => Ok(String::from_utf8_lossy(&message.stdout).to_string()),
        Err(_error) => Err("Unable to run BorgBackup. Something went wrong!".to_string()),
    }
}