use crate::{
config::Config,
error::{Result, ZeroTrustError},
types::{MigrationStatus},
};
use reqwest::Client;
use serde_json::{json, Value};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs;
use base64::{Engine as _, engine::general_purpose};
#[derive(Debug, Clone)]
pub struct MigrationManager {
config: Arc<Config>,
http_client: Arc<Client>,
}
impl MigrationManager {
pub(crate) fn new(config: Arc<Config>, http_client: Arc<Client>) -> Self {
Self {
config,
http_client,
}
}
pub fn import_csv<P: AsRef<Path>>(&self, file_path: P) -> CsvImportBuilder {
CsvImportBuilder::new(
file_path.as_ref().to_path_buf(),
self.config.clone(),
self.http_client.clone(),
)
}
pub fn import_json<P: AsRef<Path>>(&self, file_path: P) -> JsonImportBuilder {
JsonImportBuilder::new(
file_path.as_ref().to_path_buf(),
self.config.clone(),
self.http_client.clone(),
)
}
pub fn export_csv<P: AsRef<Path>>(&self, file_path: P) -> CsvExportBuilder {
CsvExportBuilder::new(
file_path.as_ref().to_path_buf(),
self.config.clone(),
self.http_client.clone(),
)
}
pub fn export_json<P: AsRef<Path>>(&self, file_path: P) -> JsonExportBuilder {
JsonExportBuilder::new(
file_path.as_ref().to_path_buf(),
self.config.clone(),
self.http_client.clone(),
)
}
pub async fn status<S: AsRef<str>>(&self, migration_id: S) -> Result<MigrationStatus> {
self.ensure_authenticated()?;
let url = format!(
"{}/api/v1/migrations/{}",
self.config.api_url,
migration_id.as_ref()
);
let response = self.send_authenticated_request(
self.http_client.get(&url)
).await?;
if response.status().is_success() {
let status: MigrationStatus = response.json().await?;
Ok(status)
} else {
self.handle_error_response(response).await
}
}
pub async fn list(&self) -> Result<Vec<MigrationStatus>> {
self.ensure_authenticated()?;
let url = format!("{}/api/v1/migrations", self.config.api_url);
let response = self.send_authenticated_request(
self.http_client.get(&url)
).await?;
if response.status().is_success() {
let migrations: Vec<MigrationStatus> = response.json().await?;
Ok(migrations)
} else {
self.handle_error_response(response).await
}
}
pub async fn cancel<S: AsRef<str>>(&self, migration_id: S) -> Result<()> {
self.ensure_authenticated()?;
let url = format!(
"{}/api/v1/migrations/{}/cancel",
self.config.api_url,
migration_id.as_ref()
);
let response = self.send_authenticated_request(
self.http_client.post(&url)
).await?;
if response.status().is_success() {
Ok(())
} else {
self.handle_error_response(response).await
}
}
fn ensure_authenticated(&self) -> Result<()> {
if !self.config.is_authenticated() {
return Err(ZeroTrustError::auth("Authentication required"));
}
Ok(())
}
async fn send_authenticated_request(
&self,
mut request: reqwest::RequestBuilder,
) -> Result<reqwest::Response> {
if let Some(token) = &self.config.token {
request = request.header("Authorization", format!("Bearer {}", token));
}
let response = request
.header("Content-Type", "application/json")
.send()
.await?;
Ok(response)
}
async fn handle_error_response<T>(&self, response: reqwest::Response) -> Result<T> {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
match status.as_u16() {
401 => Err(ZeroTrustError::auth("Authentication failed")),
403 => Err(ZeroTrustError::permission_denied("Insufficient permissions")),
404 => Err(ZeroTrustError::not_found("Resource not found")),
400 => Err(ZeroTrustError::validation(error_text)),
500..=599 => Err(ZeroTrustError::server_error(status.as_u16(), error_text)),
_ => Err(ZeroTrustError::generic(format!("HTTP {}: {}", status, error_text))),
}
}
}
#[derive(Debug)]
pub struct CsvImportBuilder {
file_path: PathBuf,
config: Arc<Config>,
http_client: Arc<Client>,
database: Option<String>,
table: Option<String>,
delimiter: char,
has_headers: bool,
batch_size: Option<usize>,
}
impl CsvImportBuilder {
fn new(file_path: PathBuf, config: Arc<Config>, http_client: Arc<Client>) -> Self {
Self {
file_path,
config,
http_client,
database: None,
table: None,
delimiter: ',',
has_headers: true,
batch_size: None,
}
}
pub fn to_database<S: Into<String>>(mut self, database: S) -> Self {
self.database = Some(database.into());
self
}
pub fn to_table<S: Into<String>>(mut self, table: S) -> Self {
self.table = Some(table.into());
self
}
pub fn with_delimiter(mut self, delimiter: char) -> Self {
self.delimiter = delimiter;
self
}
pub fn with_headers(mut self, has_headers: bool) -> Self {
self.has_headers = has_headers;
self
}
pub fn batch_size(mut self, size: usize) -> Self {
self.batch_size = Some(size);
self
}
pub async fn execute(self) -> Result<MigrationStatus> {
let database = self.database.clone().ok_or_else(|| {
ZeroTrustError::validation("Database name is required")
})?;
let table = self.table.clone().ok_or_else(|| {
ZeroTrustError::validation("Table name is required")
})?;
let file_content = fs::read(&self.file_path).await?;
let encoded_content = general_purpose::STANDARD.encode(&file_content);
let url = format!("{}/api/v1/migrations/import/csv", self.config.api_url);
let mut payload = json!({
"database": database,
"table": table,
"file_content": encoded_content,
"delimiter": self.delimiter.to_string(),
"has_headers": self.has_headers
});
if let Some(batch_size) = self.batch_size {
payload["batch_size"] = json!(batch_size);
}
let response = self.send_authenticated_request(
self.http_client
.post(&url)
.json(&payload)
).await?;
if response.status().is_success() {
let migration: MigrationStatus = response.json().await?;
Ok(migration)
} else {
self.handle_error_response(response).await
}
}
async fn send_authenticated_request(
&self,
mut request: reqwest::RequestBuilder,
) -> Result<reqwest::Response> {
if let Some(token) = &self.config.token {
request = request.header("Authorization", format!("Bearer {}", token));
}
let response = request
.header("Content-Type", "application/json")
.send()
.await?;
Ok(response)
}
async fn handle_error_response<T>(&self, response: reqwest::Response) -> Result<T> {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
match status.as_u16() {
401 => Err(ZeroTrustError::auth("Authentication failed")),
403 => Err(ZeroTrustError::permission_denied("Insufficient permissions")),
404 => Err(ZeroTrustError::not_found("Resource not found")),
400 => Err(ZeroTrustError::validation(error_text)),
500..=599 => Err(ZeroTrustError::server_error(status.as_u16(), error_text)),
_ => Err(ZeroTrustError::generic(format!("HTTP {}: {}", status, error_text))),
}
}
}
#[derive(Debug)]
pub struct JsonImportBuilder {
file_path: PathBuf,
config: Arc<Config>,
http_client: Arc<Client>,
database: Option<String>,
table: Option<String>,
batch_size: Option<usize>,
}
impl JsonImportBuilder {
fn new(file_path: PathBuf, config: Arc<Config>, http_client: Arc<Client>) -> Self {
Self {
file_path,
config,
http_client,
database: None,
table: None,
batch_size: None,
}
}
pub fn to_database<S: Into<String>>(mut self, database: S) -> Self {
self.database = Some(database.into());
self
}
pub fn to_table<S: Into<String>>(mut self, table: S) -> Self {
self.table = Some(table.into());
self
}
pub fn batch_size(mut self, size: usize) -> Self {
self.batch_size = Some(size);
self
}
pub async fn execute(self) -> Result<MigrationStatus> {
let database = self.database.clone().ok_or_else(|| {
ZeroTrustError::validation("Database name is required")
})?;
let table = self.table.clone().ok_or_else(|| {
ZeroTrustError::validation("Table name is required")
})?;
let file_content = fs::read(&self.file_path).await?;
let encoded_content = general_purpose::STANDARD.encode(&file_content);
let url = format!("{}/api/v1/migrations/import/json", self.config.api_url);
let mut payload = json!({
"database": database,
"table": table,
"file_content": encoded_content
});
if let Some(batch_size) = self.batch_size {
payload["batch_size"] = json!(batch_size);
}
let response = self.send_authenticated_request(
self.http_client
.post(&url)
.json(&payload)
).await?;
if response.status().is_success() {
let migration: MigrationStatus = response.json().await?;
Ok(migration)
} else {
self.handle_error_response(response).await
}
}
async fn send_authenticated_request(
&self,
mut request: reqwest::RequestBuilder,
) -> Result<reqwest::Response> {
if let Some(token) = &self.config.token {
request = request.header("Authorization", format!("Bearer {}", token));
}
let response = request
.header("Content-Type", "application/json")
.send()
.await?;
Ok(response)
}
async fn handle_error_response<T>(&self, response: reqwest::Response) -> Result<T> {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
match status.as_u16() {
401 => Err(ZeroTrustError::auth("Authentication failed")),
403 => Err(ZeroTrustError::permission_denied("Insufficient permissions")),
404 => Err(ZeroTrustError::not_found("Resource not found")),
400 => Err(ZeroTrustError::validation(error_text)),
500..=599 => Err(ZeroTrustError::server_error(status.as_u16(), error_text)),
_ => Err(ZeroTrustError::generic(format!("HTTP {}: {}", status, error_text))),
}
}
}
#[derive(Debug)]
pub struct CsvExportBuilder {
file_path: PathBuf,
config: Arc<Config>,
http_client: Arc<Client>,
database: Option<String>,
table: Option<String>,
query: Option<String>,
delimiter: char,
include_headers: bool,
}
impl CsvExportBuilder {
fn new(file_path: PathBuf, config: Arc<Config>, http_client: Arc<Client>) -> Self {
Self {
file_path,
config,
http_client,
database: None,
table: None,
query: None,
delimiter: ',',
include_headers: true,
}
}
pub fn from_database<S: Into<String>>(mut self, database: S) -> Self {
self.database = Some(database.into());
self
}
pub fn from_table<S: Into<String>>(mut self, table: S) -> Self {
self.table = Some(table.into());
self
}
pub fn from_query<S: Into<String>>(mut self, query: S) -> Self {
self.query = Some(query.into());
self
}
pub fn with_delimiter(mut self, delimiter: char) -> Self {
self.delimiter = delimiter;
self
}
pub fn with_headers(mut self, include_headers: bool) -> Self {
self.include_headers = include_headers;
self
}
pub async fn execute(self) -> Result<MigrationStatus> {
let database = self.database.clone().ok_or_else(|| {
ZeroTrustError::validation("Database name is required")
})?;
let url = format!("{}/api/v1/migrations/export/csv", self.config.api_url);
let mut payload = json!({
"database": database,
"delimiter": self.delimiter.to_string(),
"include_headers": self.include_headers
});
if let Some(ref table) = self.table {
payload["table"] = json!(table);
}
if let Some(ref query) = self.query {
payload["query"] = json!(query);
}
let response = self.send_authenticated_request(
self.http_client
.post(&url)
.json(&payload)
).await?;
if response.status().is_success() {
let response_data: Value = response.json().await?;
if let Some(file_content) = response_data.get("file_content") {
let decoded_content = general_purpose::STANDARD.decode(file_content.as_str().unwrap_or(""))?;
fs::write(&self.file_path, decoded_content).await?;
}
let migration: MigrationStatus = serde_json::from_value(response_data["migration"].clone())?;
Ok(migration)
} else {
self.handle_error_response(response).await
}
}
async fn send_authenticated_request(
&self,
mut request: reqwest::RequestBuilder,
) -> Result<reqwest::Response> {
if let Some(token) = &self.config.token {
request = request.header("Authorization", format!("Bearer {}", token));
}
let response = request
.header("Content-Type", "application/json")
.send()
.await?;
Ok(response)
}
async fn handle_error_response<T>(&self, response: reqwest::Response) -> Result<T> {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
match status.as_u16() {
401 => Err(ZeroTrustError::auth("Authentication failed")),
403 => Err(ZeroTrustError::permission_denied("Insufficient permissions")),
404 => Err(ZeroTrustError::not_found("Resource not found")),
400 => Err(ZeroTrustError::validation(error_text)),
500..=599 => Err(ZeroTrustError::server_error(status.as_u16(), error_text)),
_ => Err(ZeroTrustError::generic(format!("HTTP {}: {}", status, error_text))),
}
}
}
#[derive(Debug)]
pub struct JsonExportBuilder {
file_path: PathBuf,
config: Arc<Config>,
http_client: Arc<Client>,
database: Option<String>,
table: Option<String>,
query: Option<String>,
pretty: bool,
}
impl JsonExportBuilder {
fn new(file_path: PathBuf, config: Arc<Config>, http_client: Arc<Client>) -> Self {
Self {
file_path,
config,
http_client,
database: None,
table: None,
query: None,
pretty: false,
}
}
pub fn from_database<S: Into<String>>(mut self, database: S) -> Self {
self.database = Some(database.into());
self
}
pub fn from_table<S: Into<String>>(mut self, table: S) -> Self {
self.table = Some(table.into());
self
}
pub fn from_query<S: Into<String>>(mut self, query: S) -> Self {
self.query = Some(query.into());
self
}
pub fn pretty(mut self, pretty: bool) -> Self {
self.pretty = pretty;
self
}
pub async fn execute(self) -> Result<MigrationStatus> {
let database = self.database.clone().ok_or_else(|| {
ZeroTrustError::validation("Database name is required")
})?;
let url = format!("{}/api/v1/migrations/export/json", self.config.api_url);
let mut payload = json!({
"database": database,
"pretty": self.pretty
});
if let Some(ref table) = self.table {
payload["table"] = json!(table);
}
if let Some(ref query) = self.query {
payload["query"] = json!(query);
}
let response = self.send_authenticated_request(
self.http_client
.post(&url)
.json(&payload)
).await?;
if response.status().is_success() {
let response_data: Value = response.json().await?;
if let Some(file_content) = response_data.get("file_content") {
let decoded_content = general_purpose::STANDARD.decode(file_content.as_str().unwrap_or(""))?;
fs::write(&self.file_path, decoded_content).await?;
}
let migration: MigrationStatus = serde_json::from_value(response_data["migration"].clone())?;
Ok(migration)
} else {
self.handle_error_response(response).await
}
}
async fn send_authenticated_request(
&self,
mut request: reqwest::RequestBuilder,
) -> Result<reqwest::Response> {
if let Some(token) = &self.config.token {
request = request.header("Authorization", format!("Bearer {}", token));
}
let response = request
.header("Content-Type", "application/json")
.send()
.await?;
Ok(response)
}
async fn handle_error_response<T>(&self, response: reqwest::Response) -> Result<T> {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
match status.as_u16() {
401 => Err(ZeroTrustError::auth("Authentication failed")),
403 => Err(ZeroTrustError::permission_denied("Insufficient permissions")),
404 => Err(ZeroTrustError::not_found("Resource not found")),
400 => Err(ZeroTrustError::validation(error_text)),
500..=599 => Err(ZeroTrustError::server_error(status.as_u16(), error_text)),
_ => Err(ZeroTrustError::generic(format!("HTTP {}: {}", status, error_text))),
}
}
}