use anyhow::{Result, anyhow};
use chrono::{DateTime, Local, Utc, Datelike};
use log::{error, info, warn};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::time::Duration;
use tokio::time::interval;
use blake3;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ClientConfig {
data: Vec<SystemConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SystemConfig {
system_name: String,
system_ip: String,
backup_server_url: String,
auth_token: String,
local_backup_dir: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct ApiResponse<T> {
success: bool,
message: String,
data: Option<T>,
}
struct SyncClient {
config: SystemConfig,
http_client: reqwest::Client,
}
impl SyncClient {
fn new(config: SystemConfig) -> Self {
let http_client = reqwest::Client::builder()
.timeout(Duration::from_secs(300)) .build()
.expect("Failed to create HTTP client");
Self {
config,
http_client,
}
}
async fn calculate_blake3(file_path: &Path) -> Result<String> {
use tokio::io::AsyncReadExt;
let mut file = tokio::fs::File::open(file_path).await?;
let mut hasher = blake3::Hasher::new();
let mut buffer = vec![0; 1024 * 1024];
loop {
let bytes_read = file.read(&mut buffer).await?;
if bytes_read == 0 {
break;
}
hasher.update(&buffer[..bytes_read]);
}
Ok(hasher.finalize().to_hex().to_string())
}
async fn get_remote_blake3(&self, filename: &str) -> Result<Option<String>> {
let url = format!("{}/api/blake3", self.config.backup_server_url);
let mut params = HashMap::new();
params.insert("system_ip", self.config.system_ip.clone());
params.insert("filename", filename.to_string());
let response = self.http_client
.post(&url)
.header("authorization", &self.config.auth_token)
.query(¶ms)
.send()
.await?;
if response.status().is_success() {
let api_response: ApiResponse<String> = response.json().await?;
if api_response.success {
Ok(api_response.data)
} else {
warn!("Server returned error for BLAKE3 query: {}", api_response.message);
Ok(None)
}
} else {
error!("Failed to query BLAKE3: {}", response.status());
Err(anyhow!("HTTP error: {}", response.status()))
}
}
async fn upload_file(&self, file_path: &Path) -> Result<()> {
let filename = file_path.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| anyhow!("Invalid filename"))?;
let file_size = tokio::fs::metadata(file_path).await?.len();
let blake3_hash = Self::calculate_blake3(file_path).await?;
const CHUNK_SIZE: u64 = 100 * 1024 * 1024;
if file_size > CHUNK_SIZE {
self.upload_file_chunked(file_path, filename, file_size, blake3_hash, CHUNK_SIZE).await
} else {
self.upload_file_single(file_path, filename, file_size, blake3_hash).await
}
}
async fn upload_file_single(&self, file_path: &Path, filename: &str, file_size: u64, blake3_hash: String) -> Result<()> {
let file_content = tokio::fs::read(file_path).await?;
let url = format!("{}/api/upload", self.config.backup_server_url);
let mut params = HashMap::new();
params.insert("system_ip", self.config.system_ip.clone());
params.insert("filename", filename.to_string());
params.insert("blake3", blake3_hash.clone());
params.insert("size", file_size.to_string());
let response = self.http_client
.post(&url)
.header("authorization", &self.config.auth_token)
.query(¶ms)
.body(file_content)
.send()
.await?;
if response.status().is_success() {
let api_response: ApiResponse<String> = response.json().await?;
if api_response.success {
info!("Successfully uploaded file: {}", filename);
Ok(())
} else {
Err(anyhow!("Upload failed: {}", api_response.message))
}
} else {
Err(anyhow!("Upload failed with status: {}", response.status()))
}
}
async fn upload_file_chunked(&self, file_path: &Path, filename: &str, file_size: u64, blake3_hash: String, chunk_size: u64) -> Result<()> {
let url = format!("{}/api/upload-chunked", self.config.backup_server_url);
let mut params = HashMap::new();
params.insert("system_ip", self.config.system_ip.clone());
params.insert("filename", filename.to_string());
params.insert("blake3", blake3_hash.clone());
params.insert("size", file_size.to_string());
params.insert("chunk_size", chunk_size.to_string());
let response = self.http_client
.post(&url)
.header("authorization", &self.config.auth_token)
.query(¶ms)
.send()
.await?;
if !response.status().is_success() {
return Err(anyhow!("Failed to start chunked upload: {}", response.status()));
}
let api_response: ApiResponse<String> = response.json().await?;
if !api_response.success {
return Err(anyhow!("Failed to start chunked upload: {}", api_response.message));
}
let upload_id = api_response.data.ok_or_else(|| anyhow!("No upload ID received"))?;
info!("Started chunked upload for {} with ID: {}", filename, upload_id);
use tokio::io::AsyncReadExt;
let mut file = tokio::fs::File::open(file_path).await?;
let mut chunk_number = 0;
let mut buffer = vec![0; chunk_size as usize];
loop {
let bytes_read = file.read(&mut buffer).await?;
if bytes_read == 0 {
break;
}
let chunk_data = buffer[..bytes_read].to_vec();
self.upload_chunk(&upload_id, filename, chunk_number, chunk_data).await?;
chunk_number += 1;
info!("Uploaded chunk {} for file {}", chunk_number, filename);
}
self.complete_upload(&upload_id, filename).await?;
info!("Completed chunked upload for file {}", filename);
Ok(())
}
async fn upload_chunk(&self, upload_id: &str, filename: &str, chunk_number: usize, chunk_data: Vec<u8>) -> Result<()> {
let url = format!("{}/api/upload-chunk", self.config.backup_server_url);
let mut params = HashMap::new();
params.insert("upload_id", upload_id.to_string());
params.insert("filename", filename.to_string());
params.insert("chunk_number", chunk_number.to_string());
let response = self.http_client
.post(&url)
.header("authorization", &self.config.auth_token)
.query(¶ms)
.body(chunk_data)
.send()
.await?;
if response.status().is_success() {
let api_response: ApiResponse<String> = response.json().await?;
if api_response.success {
Ok(())
} else {
Err(anyhow!("Chunk upload failed: {}", api_response.message))
}
} else {
Err(anyhow!("Chunk upload failed with status: {}", response.status()))
}
}
async fn complete_upload(&self, upload_id: &str, filename: &str) -> Result<()> {
let url = format!("{}/api/complete-upload", self.config.backup_server_url);
let mut params = HashMap::new();
params.insert("upload_id", upload_id.to_string());
params.insert("filename", filename.to_string());
let response = self.http_client
.post(&url)
.header("authorization", &self.config.auth_token)
.query(¶ms)
.send()
.await?;
if response.status().is_success() {
let api_response: ApiResponse<String> = response.json().await?;
if api_response.success {
Ok(())
} else {
Err(anyhow!("Upload completion failed: {}", api_response.message))
}
} else {
Err(anyhow!("Upload completion failed with status: {}", response.status()))
}
}
async fn sync_file(&self, file_path: &Path) -> Result<bool> {
let filename = file_path.file_name()
.and_then(|n| n.to_str())
.ok_or_else(|| anyhow!("Invalid filename"))?;
info!("Syncing file_path: {}", file_path.display());
info!("Syncing file: {}", filename);
if !file_path.exists() {
warn!("Local file not found: {:?}", file_path);
return Ok(false);
}
let local_blake3 = Self::calculate_blake3(file_path).await?;
match self.get_remote_blake3(filename).await {
Ok(Some(remote_blake3)) => {
if local_blake3 != remote_blake3 {
info!("BLAKE3 mismatch for {}, uploading new version", filename);
self.upload_file(file_path).await?;
Ok(true)
} else {
info!("File {} is already in sync", filename);
Ok(false)
}
}
Ok(None) => {
info!("Remote file not found, uploading: {}", filename);
self.upload_file(file_path).await?;
Ok(true)
}
Err(e) => {
error!("Failed to check remote BLAKE3 for {}: {}", filename, e);
Err(e)
}
}
}
async fn sync_all_files(&self) -> Result<usize> {
let backup_dir = Path::new(&self.config.local_backup_dir);
if !backup_dir.exists() {
warn!("Backup directory does not exist: {:?}", backup_dir);
return Ok(0);
}
let mut uploaded_count = 0;
let entries = fs::read_dir(backup_dir)?;
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.is_file() {
match self.sync_file(&path).await {
Ok(true) => uploaded_count += 1,
Ok(false) => {}, Err(e) => {
error!("Failed to sync file {:?}: {}", path, e);
}
}
}
}
Ok(uploaded_count)
}
async fn check_server_health(&self) -> bool {
let url = format!("{}/health", self.config.backup_server_url);
match self.http_client.get(&url).send().await {
Ok(response) if response.status().is_success() => {
if let Ok(api_response) = response.json::<ApiResponse<serde_json::Value>>().await {
info!("Server health check: {}", api_response.message);
true
} else {
false
}
}
_ => false,
}
}
}
async fn load_config(config_path: &str) -> Result<ClientConfig> {
let config_content = fs::read_to_string(config_path)?;
let config: ClientConfig = serde_json::from_str(&config_content)?;
Ok(config)
}
fn is_monthly_upload_day() -> bool {
let now = Local::now();
now.day() >= 20
}
fn is_verification_day() -> bool {
let now = Local::now();
now.day() >= 20 && now.day() <= 26
}
async fn run_monthly_upload(client: &SyncClient) -> Result<()> {
info!("Running monthly upload for system: {}", client.config.system_name);
if !client.check_server_health().await {
error!("Server health check failed, skipping upload");
return Ok(());
}
match client.sync_all_files().await {
Ok(count) => {
info!("Monthly upload completed. {} files uploaded.", count);
}
Err(e) => {
error!("Monthly upload failed: {}", e);
}
}
Ok(())
}
async fn run_daily_verification(client: &SyncClient) -> Result<()> {
info!("Running daily verification for system: {}", client.config.system_name);
if !client.check_server_health().await {
error!("Server health check failed, skipping verification");
return Ok(());
}
match client.sync_all_files().await {
Ok(count) => {
if count > 0 {
info!("Daily verification completed. {} files needed re-upload.", count);
} else {
info!("Daily verification completed. All files are in sync.");
}
}
Err(e) => {
error!("Daily verification failed: {}", e);
}
}
Ok(())
}
async fn run_scheduler(config: ClientConfig) -> Result<()> {
info!("Starting db-sync client scheduler");
let mut last_monthly_upload = None::<DateTime<Utc>>;
let mut last_daily_verification = None::<DateTime<Utc>>;
let mut interval = interval(Duration::from_secs(3600));
loop {
interval.tick().await;
let now = Local::now();
if is_monthly_upload_day() {
let should_run = match last_monthly_upload {
None => true,
Some(last) => (now.date_naive() - last.date_naive()) > chrono::Duration::days(0),
};
if should_run {
for system_config in &config.data.clone() {
let client = SyncClient::new(system_config.clone());
if let Err(e) = run_monthly_upload(&client).await {
error!("Monthly upload failed for system {}: {}",
system_config.system_name, e);
}
}
last_monthly_upload = Some(now.with_timezone(&Utc));
}
}
if is_verification_day() {
let should_run = match last_daily_verification {
None => true,
Some(last) => (now.date_naive() - last.date_naive()) > chrono::Duration::days(0),
};
if should_run {
for system_config in &config.data.clone() {
let client = SyncClient::new(system_config.clone());
if let Err(e) = run_daily_verification(&client).await {
error!("Daily verification failed for system {}: {}",
system_config.system_name, e);
}
}
last_daily_verification = Some(now.with_timezone(&Utc));
}
}
info!("Scheduler tick: {}", now.format("%Y-%m-%d %H:%M:%S"));
}
}
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Info)
.init();
info!("Starting db-sync-client...");
let config = load_config("client.config.json").await?;
info!("Loaded configuration for {} systems", config.data.len());
run_scheduler(config).await?;
Ok(())
}