use axum::{
extract::{Path, State},
http::StatusCode,
Json,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use crate::cluster::{
CrossRegionConfig, DestinationMetrics, Region, RegionLocation, ReplicationFilter,
ReplicationMetrics,
};
use crate::AppState;
#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
pub struct SetReplicationConfigRequest {
pub source_region: RegionInfo,
pub destination_regions: Vec<RegionInfo>,
#[serde(default)]
pub filter: ReplicationFilterInput,
#[serde(default = "default_true")]
pub wan_optimization: bool,
#[serde(default = "default_compression_level")]
pub compression_level: i32,
#[serde(default = "default_batch_size")]
pub batch_size: usize,
#[serde(default = "default_batch_timeout")]
pub batch_timeout_secs: u64,
#[serde(default)]
pub enable_bandwidth_limit: bool,
#[serde(default)]
pub bandwidth_limit_mbps: u64,
#[serde(default = "default_priority")]
pub priority: u8,
#[serde(default = "default_true")]
pub replicate_deletes: bool,
#[serde(default = "default_true")]
pub replicate_metadata: bool,
}
fn default_true() -> bool {
true
}
fn default_compression_level() -> i32 {
3
}
fn default_batch_size() -> usize {
100
}
fn default_batch_timeout() -> u64 {
10
}
fn default_priority() -> u8 {
50
}
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct RegionInfo {
pub id: String,
pub name: String,
pub location: LocationInfo,
#[serde(default)]
pub is_local: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, utoipa::ToSchema)]
pub struct LocationInfo {
pub continent: String,
pub country: String,
pub city: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, utoipa::ToSchema)]
pub struct ReplicationFilterInput {
#[serde(default)]
pub prefix_filters: Vec<String>,
#[serde(default)]
pub tag_filters: HashMap<String, String>,
pub min_size: Option<u64>,
pub max_size: Option<u64>,
#[serde(default)]
pub storage_classes: Vec<String>,
#[serde(default)]
pub exclude_prefixes: Vec<String>,
}
#[derive(Debug, Serialize, utoipa::ToSchema)]
pub struct SuccessResponse {
pub success: bool,
pub message: String,
}
pub async fn set_replication_config(
State(state): State<AppState>,
Path(bucket): Path<String>,
Json(req): Json<SetReplicationConfigRequest>,
) -> Result<Json<SuccessResponse>, StatusCode> {
let Some(manager) = &state.advanced_replication else {
return Err(StatusCode::NOT_IMPLEMENTED);
};
let source_region = Region {
id: req.source_region.id,
name: req.source_region.name,
location: RegionLocation {
continent: req.source_region.location.continent,
country: req.source_region.location.country,
city: req.source_region.location.city,
},
is_local: req.source_region.is_local,
};
let destination_regions = req
.destination_regions
.into_iter()
.map(|r| Region {
id: r.id,
name: r.name,
location: RegionLocation {
continent: r.location.continent,
country: r.location.country,
city: r.location.city,
},
is_local: r.is_local,
})
.collect();
let filter = ReplicationFilter {
prefix_filters: req.filter.prefix_filters,
tag_filters: req.filter.tag_filters,
min_size: req.filter.min_size,
max_size: req.filter.max_size,
storage_classes: req.filter.storage_classes,
exclude_prefixes: req.filter.exclude_prefixes,
};
let config = CrossRegionConfig {
source_region,
destination_regions,
filter,
wan_optimization: req.wan_optimization,
compression_level: req.compression_level,
batch_size: req.batch_size,
batch_timeout: std::time::Duration::from_secs(req.batch_timeout_secs),
enable_bandwidth_limit: req.enable_bandwidth_limit,
bandwidth_limit_mbps: req.bandwidth_limit_mbps,
priority: req.priority,
replicate_deletes: req.replicate_deletes,
replicate_metadata: req.replicate_metadata,
};
manager.set_bucket_config(&bucket, config).await;
Ok(Json(SuccessResponse {
success: true,
message: format!("Replication configured for bucket: {}", bucket),
}))
}
pub async fn get_replication_config(
State(state): State<AppState>,
Path(bucket): Path<String>,
) -> Result<Json<CrossRegionConfig>, StatusCode> {
let Some(manager) = &state.advanced_replication else {
return Err(StatusCode::NOT_IMPLEMENTED);
};
match manager.get_bucket_config(&bucket).await {
Some(config) => Ok(Json(config)),
None => Err(StatusCode::NOT_FOUND),
}
}
pub async fn delete_replication_config(
State(state): State<AppState>,
Path(bucket): Path<String>,
) -> Result<Json<SuccessResponse>, StatusCode> {
let Some(manager) = &state.advanced_replication else {
return Err(StatusCode::NOT_IMPLEMENTED);
};
manager.remove_bucket_config(&bucket).await;
Ok(Json(SuccessResponse {
success: true,
message: format!("Replication configuration removed for bucket: {}", bucket),
}))
}
pub async fn get_replication_metrics(
State(state): State<AppState>,
) -> Result<Json<ReplicationMetrics>, StatusCode> {
let Some(manager) = &state.advanced_replication else {
return Err(StatusCode::NOT_IMPLEMENTED);
};
let metrics = manager.get_metrics().await;
Ok(Json(metrics))
}
pub async fn get_destination_metrics(
State(state): State<AppState>,
Path(destination): Path<String>,
) -> Result<Json<DestinationMetrics>, StatusCode> {
let Some(manager) = &state.advanced_replication else {
return Err(StatusCode::NOT_IMPLEMENTED);
};
match manager.get_destination_metrics(&destination).await {
Some(metrics) => Ok(Json(metrics)),
None => Err(StatusCode::NOT_FOUND),
}
}
pub async fn flush_replication_batches(
State(state): State<AppState>,
) -> Result<Json<HashMap<String, usize>>, StatusCode> {
let Some(manager) = &state.advanced_replication else {
return Err(StatusCode::NOT_IMPLEMENTED);
};
match manager.flush_batches().await {
Ok(flushed) => Ok(Json(flushed)),
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}