use crate::datafold_node::config::DatabaseConfig;
use crate::log_feature;
use crate::logging::features::LogFeature;
use actix_web::{web, HttpResponse, Responder};
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::datafold_node::DataFoldNode;
use crate::server::http_server::AppState;
#[utoipa::path(
get,
path = "/api/system/status",
tag = "system",
responses(
(status = 200, description = "System status", body = serde_json::Value)
)
)]
pub async fn get_system_status(_state: web::Data<AppState>) -> impl Responder {
HttpResponse::Ok().json(json!({
"status": "running",
"uptime": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
"version": env!("CARGO_PKG_VERSION")
}))
}
#[utoipa::path(
get,
path = "/api/system/private-key",
tag = "system",
responses(
(status = 200, description = "Node private key", body = serde_json::Value)
)
)]
pub async fn get_node_private_key(state: web::Data<AppState>) -> impl Responder {
let node = state.node.read().await;
let private_key = node.get_node_private_key();
log_feature!(
LogFeature::HttpServer,
info,
"Node private key retrieved successfully"
);
HttpResponse::Ok().json(json!({
"success": true,
"private_key": private_key,
"message": "Node private key retrieved successfully"
}))
}
#[utoipa::path(
get,
path = "/api/system/public-key",
tag = "system",
responses(
(status = 200, description = "Node public key", body = serde_json::Value)
)
)]
pub async fn get_node_public_key(state: web::Data<AppState>) -> impl Responder {
let node = state.node.read().await;
let public_key = node.get_node_public_key();
log_feature!(
LogFeature::HttpServer,
info,
"Node public key retrieved successfully"
);
HttpResponse::Ok().json(json!({
"success": true,
"public_key": public_key,
"message": "Node public key retrieved successfully"
}))
}
#[derive(Deserialize, Serialize, utoipa::ToSchema)]
pub struct ResetDatabaseRequest {
pub confirm: bool,
}
#[derive(Serialize, utoipa::ToSchema)]
pub struct ResetDatabaseResponse {
pub success: bool,
pub message: String,
}
#[derive(Serialize, utoipa::ToSchema)]
pub struct ResetSchemaServiceResponse {
pub success: bool,
pub message: String,
}
#[utoipa::path(
post,
path = "/api/system/reset-database",
tag = "system",
request_body = ResetDatabaseRequest,
responses(
(status = 200, description = "Database reset result", body = ResetDatabaseResponse),
(status = 400, description = "Bad request", body = ResetDatabaseResponse),
(status = 500, description = "Server error", body = ResetDatabaseResponse)
)
)]
pub async fn reset_database(
state: web::Data<AppState>,
req: web::Json<ResetDatabaseRequest>,
) -> impl Responder {
if !req.confirm {
return HttpResponse::BadRequest().json(ResetDatabaseResponse {
success: false,
message: "Reset confirmation required. Set 'confirm' to true.".to_string(),
});
}
let temp_processor_node = state.node.read().await.clone();
let processor = crate::datafold_node::OperationProcessor::new(temp_processor_node);
if let Err(e) = processor.perform_database_reset(None).await {
log_feature!(
LogFeature::HttpServer,
error,
"Database reset operations failed: {}",
e
);
return HttpResponse::InternalServerError().json(ResetDatabaseResponse {
success: false,
message: format!("Database reset operations failed: {}", e),
});
}
let mut node_lock = state.node.write().await;
let config = node_lock.config.clone();
match DataFoldNode::new(config).await {
Ok(new_node) => {
*node_lock = new_node;
log_feature!(
LogFeature::HttpServer,
info,
"Database and schema service reset completed successfully"
);
HttpResponse::Ok().json(ResetDatabaseResponse {
success: true,
message:
"Database and schema service reset successfully. All data has been cleared."
.to_string(),
})
}
Err(e) => {
log_feature!(
LogFeature::HttpServer,
error,
"Database reset failed: {}",
e
);
HttpResponse::InternalServerError().json(ResetDatabaseResponse {
success: false,
message: format!("Database reset failed: {}", e),
})
}
}
}
#[utoipa::path(
post,
path = "/api/system/reset-schema-service",
tag = "system",
request_body = ResetDatabaseRequest,
responses(
(status = 200, description = "Schema service reset result", body = ResetSchemaServiceResponse),
(status = 400, description = "Bad request", body = ResetSchemaServiceResponse),
(status = 500, description = "Server error", body = ResetSchemaServiceResponse)
)
)]
pub async fn reset_schema_service(
state: web::Data<AppState>,
req: web::Json<ResetDatabaseRequest>,
) -> impl Responder {
if !req.confirm {
return HttpResponse::BadRequest().json(ResetSchemaServiceResponse {
success: false,
message: "Reset confirmation required. Set 'confirm' to true.".to_string(),
});
}
let node = state.node.read().await;
let schema_client = node.get_schema_client();
match schema_client.reset_schema_service().await {
Ok(()) => {
log_feature!(
LogFeature::HttpServer,
info,
"Schema service database reset completed successfully"
);
HttpResponse::Ok().json(ResetSchemaServiceResponse {
success: true,
message:
"Schema service database reset successfully. All schemas have been cleared."
.to_string(),
})
}
Err(e) => {
log_feature!(
LogFeature::HttpServer,
error,
"Schema service reset failed: {}",
e
);
HttpResponse::InternalServerError().json(ResetSchemaServiceResponse {
success: false,
message: format!("Schema service reset failed: {}", e),
})
}
}
}
#[derive(Deserialize, Serialize, utoipa::ToSchema, Debug, Clone)]
pub struct DatabaseConfigRequest {
pub database: DatabaseConfigDto,
}
#[derive(Deserialize, Serialize, utoipa::ToSchema, Debug, Clone)]
#[serde(tag = "type")]
pub enum DatabaseConfigDto {
#[serde(rename = "local")]
Local { path: String },
#[cfg(feature = "aws-backend")]
#[serde(rename = "dynamodb")]
DynamoDb(DynamoDbConfigDto),
}
#[derive(Deserialize, Serialize, utoipa::ToSchema, Debug, Clone, Default)]
pub struct ExplicitTablesDto {
pub main: String,
pub metadata: String,
pub permissions: String,
pub transforms: String,
pub orchestrator: String,
pub schema_states: String,
pub schemas: String,
pub public_keys: String,
pub transform_queue: String,
pub native_index: String,
pub process: String,
pub logs: String,
}
#[cfg(feature = "aws-backend")]
#[derive(Deserialize, Serialize, utoipa::ToSchema, Debug, Clone)]
pub struct DynamoDbConfigDto {
pub region: String,
pub tables: ExplicitTablesDto,
pub auto_create: bool,
pub user_id: Option<String>,
pub file_storage_bucket: Option<String>,
}
#[derive(Serialize, utoipa::ToSchema)]
pub struct DatabaseConfigResponse {
pub success: bool,
pub message: String,
pub requires_restart: bool,
}
#[utoipa::path(
get,
path = "/api/system/database-config",
tag = "system",
responses(
(status = 200, description = "Database configuration", body = DatabaseConfigDto)
)
)]
pub async fn get_database_config(state: web::Data<AppState>) -> impl Responder {
let node = state.node.read().await;
let config = &node.config;
let db_config = match &config.database {
DatabaseConfig::Local { path } => DatabaseConfigDto::Local {
path: path.to_string_lossy().to_string(),
},
#[cfg(feature = "aws-backend")]
DatabaseConfig::DynamoDb(config) => DatabaseConfigDto::DynamoDb(DynamoDbConfigDto {
region: config.region.clone(),
auto_create: config.auto_create,
user_id: config.user_id.clone(),
file_storage_bucket: config.file_storage_bucket.clone(),
tables: ExplicitTablesDto {
main: config.tables.main.clone(),
metadata: config.tables.metadata.clone(),
permissions: config.tables.permissions.clone(),
transforms: config.tables.transforms.clone(),
orchestrator: config.tables.orchestrator.clone(),
schema_states: config.tables.schema_states.clone(),
schemas: config.tables.schemas.clone(),
public_keys: config.tables.public_keys.clone(),
transform_queue: config.tables.transform_queue.clone(),
native_index: config.tables.native_index.clone(),
process: config.tables.process.clone(),
logs: config.tables.logs.clone(),
},
}),
};
HttpResponse::Ok().json(db_config)
}
#[utoipa::path(
post,
path = "/api/system/database-config",
tag = "system",
request_body = DatabaseConfigRequest,
responses(
(status = 200, description = "Configuration updated", body = DatabaseConfigResponse),
(status = 400, description = "Bad request", body = DatabaseConfigResponse),
(status = 500, description = "Server error", body = DatabaseConfigResponse)
)
)]
pub async fn update_database_config(
state: web::Data<AppState>,
req: web::Json<DatabaseConfigRequest>,
) -> impl Responder {
let node = state.node.read().await;
let mut config = node.config.clone();
let new_db_config = match &req.database {
DatabaseConfigDto::Local { path } => DatabaseConfig::Local {
path: std::path::PathBuf::from(path),
},
#[cfg(feature = "aws-backend")]
DatabaseConfigDto::DynamoDb(dto) => {
DatabaseConfig::DynamoDb(crate::storage::DynamoDbConfig {
region: dto.region.clone(),
auto_create: dto.auto_create,
user_id: dto.user_id.clone(),
file_storage_bucket: dto.file_storage_bucket.clone(),
tables: crate::storage::ExplicitTables {
main: dto.tables.main.clone(),
metadata: dto.tables.metadata.clone(),
permissions: dto.tables.permissions.clone(),
transforms: dto.tables.transforms.clone(),
orchestrator: dto.tables.orchestrator.clone(),
schema_states: dto.tables.schema_states.clone(),
schemas: dto.tables.schemas.clone(),
public_keys: dto.tables.public_keys.clone(),
transform_queue: dto.tables.transform_queue.clone(),
native_index: dto.tables.native_index.clone(),
process: dto.tables.process.clone(),
logs: dto.tables.logs.clone(),
},
})
}
};
config.database = new_db_config;
match &config.database {
DatabaseConfig::Local { path: _ } => {
}
#[cfg(feature = "aws-backend")]
DatabaseConfig::DynamoDb(_) => {
}
}
let temp_processor_node = state.node.read().await.clone();
let processor = crate::datafold_node::OperationProcessor::new(temp_processor_node);
let config_path =
std::env::var("NODE_CONFIG").unwrap_or_else(|_| "config/node_config.json".to_string());
let updated_config = match processor
.update_database_configuration(config.database)
.await
{
Ok(cfg) => cfg,
Err(e) => {
log_feature!(
LogFeature::HttpServer,
error,
"Failed to update database configuration: {}",
e
);
return HttpResponse::InternalServerError().json(DatabaseConfigResponse {
success: false,
message: format!("Failed to update database configuration: {}", e),
requires_restart: false,
});
}
};
match DataFoldNode::new(updated_config.clone()).await {
Ok(new_node) => {
let mut node = state.node.write().await;
*node = new_node;
log_feature!(
LogFeature::HttpServer,
info,
"Database configuration updated and node restarted successfully"
);
HttpResponse::Ok().json(DatabaseConfigResponse {
success: true,
message: "Database configuration updated and node restarted successfully."
.to_string(),
requires_restart: false,
})
}
Err(e) => {
log_feature!(
LogFeature::HttpServer,
error,
"Failed to recreate node with new database configuration: {}",
e
);
if let Ok(old_config) =
crate::datafold_node::config::load_node_config(Some(&config_path), None)
{
if let Ok(old_node) = DataFoldNode::new(old_config).await {
let mut node = state.node.write().await;
*node = old_node;
}
}
HttpResponse::InternalServerError().json(DatabaseConfigResponse {
success: false,
message: format!("Failed to restart node with new database configuration: {}. The previous configuration has been restored.", e),
requires_restart: false,
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::datafold_node::{DataFoldNode, NodeConfig};
use actix_web::test;
use std::sync::Arc;
use tempfile::tempdir;
async fn create_test_state(temp_dir: &tempfile::TempDir) -> web::Data<AppState> {
let config =
NodeConfig::new(temp_dir.path().to_path_buf()).with_schema_service_url("test://mock");
let node = DataFoldNode::new(config).await.unwrap();
web::Data::new(AppState {
node: Arc::new(tokio::sync::RwLock::new(node)),
})
}
#[tokio::test]
async fn test_system_status() {
let temp_dir = tempdir().unwrap();
let state = create_test_state(&temp_dir).await;
let req = test::TestRequest::get().to_http_request();
let resp = get_system_status(state).await.respond_to(&req);
assert_eq!(resp.status(), 200);
}
#[tokio::test]
async fn test_get_node_private_key() {
let temp_dir = tempdir().unwrap();
let state = create_test_state(&temp_dir).await;
let req = test::TestRequest::get().to_http_request();
let resp = get_node_private_key(state).await.respond_to(&req);
assert_eq!(resp.status(), 200);
let body = resp.into_body();
let bytes = actix_web::body::to_bytes(body).await.unwrap_or_default();
let response: serde_json::Value = serde_json::from_slice(&bytes).unwrap_or_default();
assert!(response["success"].as_bool().unwrap_or(false));
assert!(response["private_key"].as_str().is_some());
assert!(!response["private_key"].as_str().unwrap_or("").is_empty());
}
#[tokio::test]
async fn test_get_node_public_key() {
let temp_dir = tempdir().unwrap();
let state = create_test_state(&temp_dir).await;
let req = test::TestRequest::get().to_http_request();
let resp = get_node_public_key(state).await.respond_to(&req);
assert_eq!(resp.status(), 200);
let body = resp.into_body();
let bytes = actix_web::body::to_bytes(body).await.unwrap_or_default();
let response: serde_json::Value = serde_json::from_slice(&bytes).unwrap_or_default();
assert!(response["success"].as_bool().unwrap_or(false));
assert!(response["public_key"].as_str().is_some());
assert!(!response["public_key"].as_str().unwrap_or("").is_empty());
}
#[tokio::test]
async fn test_private_and_public_keys_are_different() {
let temp_dir = tempdir().unwrap();
let state = create_test_state(&temp_dir).await;
let req1 = test::TestRequest::get().to_http_request();
let resp1 = get_node_private_key(state.clone()).await.respond_to(&req1);
let body1 = resp1.into_body();
let bytes1 = actix_web::body::to_bytes(body1).await.unwrap_or_default();
let response1: serde_json::Value = serde_json::from_slice(&bytes1).unwrap_or_default();
let private_key = response1["private_key"].as_str().unwrap_or("");
let req2 = test::TestRequest::get().to_http_request();
let resp2 = get_node_public_key(state).await.respond_to(&req2);
let body2 = resp2.into_body();
let bytes2 = actix_web::body::to_bytes(body2).await.unwrap_or_default();
let response2: serde_json::Value = serde_json::from_slice(&bytes2).unwrap_or_default();
let public_key = response2["public_key"].as_str().unwrap_or("");
assert_ne!(private_key, public_key);
assert!(!private_key.is_empty());
assert!(!public_key.is_empty());
}
#[tokio::test]
async fn test_reset_database_without_confirmation() {
let temp_dir = tempdir().unwrap();
let state = create_test_state(&temp_dir).await;
let req_body = ResetDatabaseRequest { confirm: false };
let req = test::TestRequest::post()
.set_json(&req_body)
.to_http_request();
let resp = reset_database(state, web::Json(req_body))
.await
.respond_to(&req);
assert_eq!(resp.status(), 400);
}
#[tokio::test]
async fn test_reset_database_with_confirmation() {
let temp_dir = tempdir().unwrap();
let state = create_test_state(&temp_dir).await;
let req_body = ResetDatabaseRequest { confirm: true };
let req = test::TestRequest::post()
.set_json(&req_body)
.to_http_request();
let resp = reset_database(state, web::Json(req_body))
.await
.respond_to(&req);
assert!(resp.status() == 200 || resp.status() == 500);
if resp.status() == 500 {
}
}
}