use anyhow::Result;
use log::{error, info, warn};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio::io::AsyncWriteExt;
use warp::{Filter, Reply, Rejection};
use uuid::Uuid;
use blake3;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
pub listen_addr: String,
pub data: Vec<SystemConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemConfig {
pub system_name: String,
pub system_ip: String,
pub backup_dir: String,
pub auth_token: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct FileInfo {
pub filename: String,
pub blake3: String,
pub size: u64,
pub upload_time: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ApiResponse<T> {
pub success: bool,
pub message: String,
pub data: Option<T>,
}
pub struct ServerState {
pub config: ServerConfig,
pub file_registry: Arc<RwLock<HashMap<String, HashMap<String, FileInfo>>>>, pub chunked_uploads: Arc<RwLock<HashMap<String, ChunkedUploadInfo>>>, }
#[derive(Debug, Clone)]
pub struct ChunkedUploadInfo {
pub system_ip: String,
pub filename: String,
pub expected_blake3: String,
pub expected_size: u64,
pub chunk_size: u64,
pub temp_file_path: String,
pub received_chunks: HashMap<usize, u64>, pub total_received: u64,
}
impl ServerState {
pub fn new(config: ServerConfig) -> Self {
Self {
config,
file_registry: Arc::new(RwLock::new(HashMap::new())),
chunked_uploads: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn validate_token(&self, token: &str, system_ip: &str) -> bool {
self.config.data.iter().any(|sys| {
sys.system_ip == system_ip && sys.auth_token == token
})
}
pub fn get_backup_dir(&self, system_ip: &str) -> Option<String> {
self.config.data.iter()
.find(|sys| sys.system_ip == system_ip)
.map(|sys| sys.backup_dir.clone())
}
}
async fn load_config(config_path: &str) -> Result<ServerConfig> {
let config_content = fs::read_to_string(config_path)?;
let config: ServerConfig = serde_json::from_str(&config_content)?;
Ok(config)
}
async fn health_check() -> Result<impl Reply, Infallible> {
Ok(warp::reply::json(&ApiResponse {
success: true,
message: "Server is running".to_string(),
data: None::<String>,
}))
}
async fn upload_file(
state: Arc<ServerState>,
token: String,
system_ip: String,
filename: String,
blake3: String,
size: u64,
file_content: bytes::Bytes,
) -> Result<impl Reply, Rejection> {
if !state.validate_token(&token, &system_ip) {
error!("Invalid auth token for system: {}", system_ip);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "Invalid authentication token".to_string(),
data: None,
}));
}
let backup_dir = match state.get_backup_dir(&system_ip) {
Some(dir) => dir,
None => {
error!("Backup directory not found for system: {}", system_ip);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "System not configured".to_string(),
data: None,
}));
}
};
if let Err(e) = fs::create_dir_all(&backup_dir) {
error!("Failed to create backup directory {}: {}", backup_dir, e);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: format!("Failed to create backup directory: {}", e),
data: None,
}));
}
if file_content.len() as u64 != size {
error!("File size mismatch for {} from system {}", filename, system_ip);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "File size mismatch".to_string(),
data: None,
}));
}
let computed_blake3 = blake3::hash(&file_content).to_hex().to_string();
if computed_blake3 != blake3 {
error!("BLAKE3 mismatch for {} from system {}", filename, system_ip);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "BLAKE3 verification failed".to_string(),
data: None,
}));
}
let file_path = Path::new(&backup_dir).join(&filename);
if let Err(e) = fs::write(&file_path, &file_content) {
error!("Failed to save file {}: {}", file_path.display(), e);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: format!("Failed to save file: {}", e),
data: None,
}));
}
let upload_time = chrono::Utc::now().to_rfc3339();
let file_info = FileInfo {
filename: filename.clone(),
blake3: blake3.clone(), size,
upload_time,
};
{
let mut registry = state.file_registry.write().await;
let system_files = registry.entry(system_ip.clone()).or_insert_with(HashMap::new);
system_files.insert(filename.clone(), file_info);
}
info!("File {} uploaded successfully from system {}", filename, system_ip);
Ok(warp::reply::json(&ApiResponse::<String> {
success: true,
message: "File uploaded successfully".to_string(),
data: None,
}))
}
async fn start_chunked_upload(
state: Arc<ServerState>,
token: String,
system_ip: String,
filename: String,
blake3: String,
size: u64,
chunk_size: u64,
) -> Result<impl Reply, Rejection> {
if !state.validate_token(&token, &system_ip) {
error!("Invalid auth token for system: {}", system_ip);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "Invalid authentication token".to_string(),
data: None,
}));
}
let backup_dir = match state.get_backup_dir(&system_ip) {
Some(dir) => dir,
None => {
error!("Backup directory not found for system: {}", system_ip);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "System not configured".to_string(),
data: None,
}));
}
};
if let Err(e) = fs::create_dir_all(&backup_dir) {
error!("Failed to create backup directory {}: {}", backup_dir, e);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: format!("Failed to create backup directory: {}", e),
data: None,
}));
}
let upload_id = Uuid::new_v4().to_string();
let temp_file_path = Path::new(&backup_dir).join(format!("{}.tmp", upload_id));
let upload_info = ChunkedUploadInfo {
system_ip: system_ip.clone(),
filename: filename.clone(),
expected_blake3: blake3.clone(),
expected_size: size,
chunk_size,
temp_file_path: temp_file_path.to_string_lossy().to_string(),
received_chunks: HashMap::new(),
total_received: 0,
};
{
let mut uploads = state.chunked_uploads.write().await;
uploads.insert(upload_id.clone(), upload_info);
}
info!("Started chunked upload session {} for file {} from system {}", upload_id, filename, system_ip);
Ok(warp::reply::json(&ApiResponse {
success: true,
message: "Chunked upload session started".to_string(),
data: Some(upload_id),
}))
}
async fn upload_chunk(
state: Arc<ServerState>,
token: String,
upload_id: String,
filename: String,
chunk_number: usize,
chunk_data: bytes::Bytes,
) -> Result<impl Reply, Rejection> {
let system_ip = {
let uploads = state.chunked_uploads.read().await;
match uploads.get(&upload_id) {
Some(upload_info) => {
if upload_info.filename != filename {
error!("Filename mismatch for upload {}", upload_id);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "Filename mismatch".to_string(),
data: None,
}));
}
upload_info.system_ip.clone()
}
None => {
error!("Invalid upload ID: {}", upload_id);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "Invalid upload ID".to_string(),
data: None,
}));
}
}
};
if !state.validate_token(&token, &system_ip) {
error!("Invalid auth token for system: {}", system_ip);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "Invalid authentication token".to_string(),
data: None,
}));
}
{
let mut uploads = state.chunked_uploads.write().await;
if let Some(upload_info) = uploads.get_mut(&upload_id) {
if upload_info.received_chunks.contains_key(&chunk_number) {
warn!("Chunk {} already received for upload {}", chunk_number, upload_id);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: true,
message: "Chunk already received".to_string(),
data: None,
}));
}
let mut file = match tokio::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&upload_info.temp_file_path)
.await
{
Ok(f) => f,
Err(e) => {
error!("Failed to open temp file for upload {}: {}", upload_id, e);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: format!("Failed to open temp file: {}", e),
data: None,
}));
}
};
if let Err(e) = file.write_all(&chunk_data).await {
error!("Failed to write chunk {} for upload {}: {}", chunk_number, upload_id, e);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: format!("Failed to write chunk: {}", e),
data: None,
}));
}
upload_info.received_chunks.insert(chunk_number, chunk_data.len() as u64);
upload_info.total_received += chunk_data.len() as u64;
info!("Received chunk {} for upload {} ({} bytes)", chunk_number, upload_id, chunk_data.len());
} else {
error!("Upload session not found: {}", upload_id);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "Upload session not found".to_string(),
data: None,
}));
}
}
Ok(warp::reply::json(&ApiResponse::<String> {
success: true,
message: "Chunk uploaded successfully".to_string(),
data: None,
}))
}
async fn complete_upload(
state: Arc<ServerState>,
token: String,
upload_id: String,
filename: String,
) -> Result<impl Reply, Rejection> {
let upload_info = {
let uploads = state.chunked_uploads.read().await;
match uploads.get(&upload_id) {
Some(info) => {
if info.filename != filename {
error!("Filename mismatch for upload {}", upload_id);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "Filename mismatch".to_string(),
data: None,
}));
}
info.clone()
}
None => {
error!("Upload session not found: {}", upload_id);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "Upload session not found".to_string(),
data: None,
}));
}
}
};
if !state.validate_token(&token, &upload_info.system_ip) {
error!("Invalid auth token for system: {}", upload_info.system_ip);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "Invalid authentication token".to_string(),
data: None,
}));
}
if upload_info.total_received != upload_info.expected_size {
error!("File size mismatch for upload {}. Expected: {}, Received: {}",
upload_id, upload_info.expected_size, upload_info.total_received);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "File size mismatch".to_string(),
data: None,
}));
}
let temp_file_content = match tokio::fs::read(&upload_info.temp_file_path).await {
Ok(content) => content,
Err(e) => {
error!("Failed to read temp file for upload {}: {}", upload_id, e);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: format!("Failed to read temp file: {}", e),
data: None,
}));
}
};
let computed_blake3 = blake3::hash(&temp_file_content).to_hex().to_string();
if computed_blake3 != upload_info.expected_blake3 {
error!("BLAKE3 mismatch for upload {}. Expected: {}, Computed: {}",
upload_id, upload_info.expected_blake3, computed_blake3);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "BLAKE3 verification failed".to_string(),
data: None,
}));
}
let backup_dir = match state.get_backup_dir(&upload_info.system_ip) {
Some(dir) => dir,
None => {
error!("Backup directory not found for system: {}", upload_info.system_ip);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "System not configured".to_string(),
data: None,
}));
}
};
let final_file_path = Path::new(&backup_dir).join(&filename);
if let Err(e) = tokio::fs::rename(&upload_info.temp_file_path, &final_file_path).await {
error!("Failed to move temp file for upload {}: {}", upload_id, e);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: format!("Failed to move temp file: {}", e),
data: None,
}));
}
let upload_time = chrono::Utc::now().to_rfc3339();
let file_info = FileInfo {
filename: filename.clone(),
blake3: upload_info.expected_blake3.clone(), size: upload_info.expected_size,
upload_time,
};
{
let mut registry = state.file_registry.write().await;
let system_files = registry.entry(upload_info.system_ip.clone()).or_insert_with(HashMap::new);
system_files.insert(filename.clone(), file_info);
}
{
let mut uploads = state.chunked_uploads.write().await;
uploads.remove(&upload_id);
}
info!("Completed chunked upload {} for file {} from system {}", upload_id, filename, upload_info.system_ip);
Ok(warp::reply::json(&ApiResponse::<String> {
success: true,
message: "File upload completed successfully".to_string(),
data: None,
}))
}
async fn query_blake3(
state: Arc<ServerState>,
token: String,
system_ip: String,
filename: String,
) -> Result<impl Reply, Rejection> {
if !state.validate_token(&token, &system_ip) {
error!("Invalid auth token for system: {}", system_ip);
return Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "Invalid authentication token".to_string(),
data: None,
}));
}
let registry = state.file_registry.read().await;
if let Some(system_files) = registry.get(&system_ip) {
if let Some(file_info) = system_files.get(&filename) {
return Ok(warp::reply::json(&ApiResponse {
success: true,
message: "File found".to_string(),
data: Some(file_info.blake3.clone()),
}));
}
}
if let Some(backup_dir) = state.get_backup_dir(&system_ip) {
let file_path = Path::new(&backup_dir).join(&filename);
if file_path.exists() {
if let Ok(content) = fs::read(&file_path) {
let blake3_hash = blake3::hash(&content).to_hex().to_string();
return Ok(warp::reply::json(&ApiResponse {
success: true,
message: "File found on disk".to_string(),
data: Some(blake3_hash),
}));
}
}
}
Ok(warp::reply::json(&ApiResponse::<String> {
success: false,
message: "File not found".to_string(),
data: None,
}))
}
#[tokio::main]
async fn main() -> Result<()> {
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Info)
.init();
info!("Starting db-sync-server...");
let config = load_config("server.config.json").await?;
info!("Loaded configuration for {} systems", config.data.len());
let state = Arc::new(ServerState::new(config));
let listen_addr = state.config.listen_addr.clone();
let health_route = warp::path("health")
.and(warp::get())
.and_then(health_check);
let state_clone1 = state.clone();
let upload_route = warp::path("api")
.and(warp::path("upload"))
.and(warp::post())
.and(warp::header::<String>("authorization"))
.and(warp::query::<HashMap<String, String>>())
.and(warp::body::bytes())
.and(warp::any().map(move || state_clone1.clone()))
.and_then(|token: String, mut query: HashMap<String, String>, file_content: bytes::Bytes, state: Arc<ServerState>| async move {
let system_ip = query.remove("system_ip").unwrap_or_default();
let filename = query.remove("filename").unwrap_or_default();
let size = query.remove("size").and_then(|s| s.parse().ok()).unwrap_or(0);
let blake3 = query.remove("blake3").unwrap_or_default();
upload_file(state, token, system_ip, filename, blake3, size, file_content).await
});
let state_clone2 = state.clone();
let blake3_route = warp::path("api")
.and(warp::path("blake3"))
.and(warp::post())
.and(warp::header::<String>("authorization"))
.and(warp::query::<HashMap<String, String>>())
.and(warp::any().map(move || state_clone2.clone()))
.and_then(|token: String, mut query: HashMap<String, String>, state: Arc<ServerState>| async move {
let system_ip = query.remove("system_ip").unwrap_or_default();
let filename = query.remove("filename").unwrap_or_default();
query_blake3(state, token, system_ip, filename).await });
let state_clone3 = state.clone();
let start_chunked_route = warp::path("api")
.and(warp::path("upload-chunked"))
.and(warp::post())
.and(warp::header::<String>("authorization"))
.and(warp::query::<HashMap<String, String>>())
.and(warp::any().map(move || state_clone3.clone()))
.and_then(|token: String, mut query: HashMap<String, String>, state: Arc<ServerState>| async move {
let system_ip = query.remove("system_ip").unwrap_or_default();
let filename = query.remove("filename").unwrap_or_default();
let blake3 = query.remove("blake3").unwrap_or_default();
let size = query.remove("size").and_then(|s| s.parse().ok()).unwrap_or(0);
let chunk_size = query.remove("chunk_size").and_then(|s| s.parse().ok()).unwrap_or(0);
start_chunked_upload(state, token, system_ip, filename, blake3, size, chunk_size).await
});
let state_clone4 = state.clone();
let upload_chunk_route = warp::path("api")
.and(warp::path("upload-chunk"))
.and(warp::post())
.and(warp::header::<String>("authorization"))
.and(warp::query::<HashMap<String, String>>())
.and(warp::body::bytes())
.and(warp::any().map(move || state_clone4.clone()))
.and_then(|token: String, mut query: HashMap<String, String>, chunk_data: bytes::Bytes, state: Arc<ServerState>| async move {
let upload_id = query.remove("upload_id").unwrap_or_default();
let filename = query.remove("filename").unwrap_or_default();
let chunk_number = query.remove("chunk_number").and_then(|s| s.parse().ok()).unwrap_or(0);
upload_chunk(state, token, upload_id, filename, chunk_number, chunk_data).await
});
let state_clone5 = state.clone();
let complete_upload_route = warp::path("api")
.and(warp::path("complete-upload"))
.and(warp::post())
.and(warp::header::<String>("authorization"))
.and(warp::query::<HashMap<String, String>>())
.and(warp::any().map(move || state_clone5.clone()))
.and_then(|token: String, mut query: HashMap<String, String>, state: Arc<ServerState>| async move {
let upload_id = query.remove("upload_id").unwrap_or_default();
let filename = query.remove("filename").unwrap_or_default();
complete_upload(state, token, upload_id, filename).await
});
let routes = health_route
.or(upload_route)
.or(blake3_route)
.or(start_chunked_route)
.or(upload_chunk_route)
.or(complete_upload_route)
.with(warp::log("db_sync_server"));
info!("Server listening on: {}", listen_addr);
let addr: std::net::SocketAddr = listen_addr.parse()?;
warp::serve(routes).run(addr).await;
Ok(())
}