use std::collections::HashMap;
use axum::Router;
use axum::async_trait;
use axum::body::Bytes;
use axum::extract::{FromRequest, Path, Query, Request, State};
use axum::http::StatusCode;
use axum::response::Json;
use axum::routing::get;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::{Component, ValidationError, validate_value};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ComponentDefinition {
pub component: Component,
pub schema: serde_json::Value,
}
impl ComponentDefinition {
pub fn new(component: Component, schema: Value) -> Self {
Self { component, schema }
}
pub fn validate_schema(&self) -> Result<(), ValidationError> {
validate_schema_structure(&self.schema)
}
pub fn validate_component_data(&self, data: &Value) -> Result<(), ValidationError> {
validate_value(data, &self.schema)
}
}
pub struct ComponentDefinitionExtractor(pub ComponentDefinition);
#[async_trait]
impl<S> FromRequest<S> for ComponentDefinitionExtractor
where
S: Send + Sync,
{
type Rejection = (StatusCode, &'static str);
async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
let (parts, body) = req.into_parts();
let content_type = parts
.headers
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("application/json")
.to_string();
let bytes = Bytes::from_request(Request::from_parts(parts, body), state)
.await
.map_err(|_| (StatusCode::BAD_REQUEST, "failed to read request body"))?;
let definition = if content_type.contains("yaml") || content_type.contains("yml") {
serde_yml::from_slice::<ComponentDefinition>(&bytes)
.map_err(|_| (StatusCode::BAD_REQUEST, "invalid yaml"))?
} else {
serde_json::from_slice::<ComponentDefinition>(&bytes)
.map_err(|_| (StatusCode::BAD_REQUEST, "invalid json"))?
};
Ok(ComponentDefinitionExtractor(definition))
}
}
fn validate_schema_structure(schema: &Value) -> Result<(), ValidationError> {
if !schema.is_object() {
return Err(ValidationError::InvalidSchema(
"Schema must be an object".to_string(),
));
}
let schema_obj = schema.as_object().unwrap();
if let Some(one_of) = schema_obj.get("oneOf") {
if !one_of.is_array() {
return Err(ValidationError::InvalidSchema(
"oneOf must be an array".to_string(),
));
}
for (i, sub_schema) in one_of.as_array().unwrap().iter().enumerate() {
validate_schema_structure(sub_schema).map_err(|e| {
ValidationError::InvalidSchema(format!(
"Invalid oneOf schema at index {}: {}",
i, e
))
})?;
}
return Ok(());
}
if let Some(schema_type) = schema_obj.get("type") {
if !schema_type.is_string() {
return Err(ValidationError::InvalidSchema(
"Schema type must be a string".to_string(),
));
}
let type_str = schema_type.as_str().unwrap();
match type_str {
"null" | "boolean" | "integer" | "number" | "string" => Ok(()),
"array" => {
if let Some(items) = schema_obj.get("items") {
validate_schema_structure(items)
} else {
Ok(())
}
}
"object" => {
if let Some(properties) = schema_obj.get("properties") {
if !properties.is_object() {
return Err(ValidationError::InvalidSchema(
"Properties must be an object".to_string(),
));
}
for (prop_name, prop_schema) in properties.as_object().unwrap() {
validate_schema_structure(prop_schema).map_err(|e| {
ValidationError::InvalidSchema(format!(
"Invalid property schema '{}': {}",
prop_name, e
))
})?;
}
}
Ok(())
}
_ => Err(ValidationError::InvalidSchema(format!(
"Unknown schema type: {}",
type_str
))),
}
} else {
Err(ValidationError::InvalidSchema(
"Schema must have either 'type' or 'oneOf'".to_string(),
))
}
}
async fn get_component_definitions(
State(pool): State<sqlx::PgPool>,
Query(_params): Query<HashMap<String, String>>,
) -> Result<Json<Vec<ComponentDefinition>>, (StatusCode, &'static str)> {
let mut tx = pool.begin().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to begin transaction",
)
})?;
match crate::sql::component_definition::list(&mut tx).await {
Ok(definitions) => {
tx.commit().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to commit transaction",
)
})?;
Ok(Json(definitions))
}
Err(_) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
"failed to list component definitions",
)),
}
}
async fn create_component_definition(
State(pool): State<sqlx::PgPool>,
ComponentDefinitionExtractor(definition): ComponentDefinitionExtractor,
) -> Result<Json<ComponentDefinition>, (StatusCode, &'static str)> {
if let Err(_e) = definition.validate_schema() {
return Err((StatusCode::BAD_REQUEST, "invalid schema"));
}
let mut tx = pool.begin().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to begin transaction",
)
})?;
match crate::sql::component_definition::create(&mut tx, &definition).await {
Ok(()) => {
tx.commit().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to commit transaction",
)
})?;
Ok(Json(definition))
}
Err(crate::DataStoreError::AlreadyExists) => Err((StatusCode::CONFLICT, "already exists")),
Err(_) => Err((StatusCode::INTERNAL_SERVER_ERROR, "internal server error")),
}
}
async fn update_component_definition(
State(pool): State<sqlx::PgPool>,
ComponentDefinitionExtractor(definition): ComponentDefinitionExtractor,
) -> Result<Json<ComponentDefinition>, (StatusCode, &'static str)> {
if let Err(_e) = definition.validate_schema() {
return Err((StatusCode::BAD_REQUEST, "invalid schema"));
}
let mut tx = pool.begin().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to begin transaction",
)
})?;
match crate::sql::component_definition::update(&mut tx, &definition).await {
Ok(_) => {
tx.commit().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to commit transaction",
)
})?;
Ok(Json(definition))
}
Err(_) => Err((
StatusCode::INTERNAL_SERVER_ERROR,
"failed to update component definition",
)),
}
}
async fn patch_component_definition(
State(pool): State<sqlx::PgPool>,
Json(patch): Json<Value>,
) -> Result<Json<ComponentDefinition>, (StatusCode, &'static str)> {
let component = Component::new("PatchedComponent").unwrap();
let definition = ComponentDefinition {
component: component.clone(),
schema: patch.clone(),
};
if let Err(_e) = definition.validate_schema() {
return Err((StatusCode::BAD_REQUEST, "invalid schema"));
}
let mut tx = pool.begin().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to begin transaction",
)
})?;
match crate::sql::component_definition::update(&mut tx, &definition).await {
Ok(_) => {
tx.commit().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to commit transaction",
)
})?;
Ok(Json(definition))
}
Err(_) => Err((StatusCode::INTERNAL_SERVER_ERROR, "internal server error")),
}
}
async fn delete_component_definitions(
State(pool): State<sqlx::PgPool>,
) -> Result<StatusCode, (StatusCode, &'static str)> {
let mut tx = pool.begin().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to begin transaction",
)
})?;
let definitions = match crate::sql::component_definition::list(&mut tx).await {
Ok(defs) => defs,
Err(_) => return Err((StatusCode::INTERNAL_SERVER_ERROR, "internal server error")),
};
for definition in definitions {
if crate::sql::component_definition::delete(&mut tx, &definition.component)
.await
.is_err()
{
return Err((StatusCode::INTERNAL_SERVER_ERROR, "internal server error"));
}
}
tx.commit().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to commit transaction",
)
})?;
Ok(StatusCode::NO_CONTENT)
}
async fn get_component_definition_by_id(
State(pool): State<sqlx::PgPool>,
Path(id): Path<String>,
) -> Result<Json<ComponentDefinition>, (StatusCode, &'static str)> {
let component =
Component::new(&id).ok_or((StatusCode::BAD_REQUEST, "invalid component name"))?;
let mut tx = pool.begin().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to begin transaction",
)
})?;
match crate::sql::component_definition::get(&mut tx, &component).await {
Ok(Some(record)) => {
tx.commit().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to commit transaction",
)
})?;
Ok(Json(record.definition))
}
Ok(None) => Err((StatusCode::NOT_FOUND, "not found")),
Err(_) => Err((StatusCode::INTERNAL_SERVER_ERROR, "internal server error")),
}
}
async fn update_component_definition_by_id(
State(pool): State<sqlx::PgPool>,
Path(id): Path<String>,
ComponentDefinitionExtractor(definition): ComponentDefinitionExtractor,
) -> Result<Json<ComponentDefinition>, (StatusCode, &'static str)> {
let component =
Component::new(&id).ok_or((StatusCode::BAD_REQUEST, "invalid component name"))?;
if let Err(_e) = definition.validate_schema() {
return Err((StatusCode::BAD_REQUEST, "invalid schema"));
}
if component != definition.component {
return Err((StatusCode::BAD_REQUEST, "component name mismatch"));
}
let mut tx = pool.begin().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to begin transaction",
)
})?;
match crate::sql::component_definition::update(&mut tx, &definition).await {
Ok(_) => {
tx.commit().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to commit transaction",
)
})?;
Ok(Json(definition))
}
Err(_) => Err((StatusCode::INTERNAL_SERVER_ERROR, "internal server error")),
}
}
async fn patch_component_definition_by_id(
State(pool): State<sqlx::PgPool>,
Path(id): Path<String>,
Json(patch): Json<Value>,
) -> Result<Json<ComponentDefinition>, (StatusCode, &'static str)> {
let component =
Component::new(&id).ok_or((StatusCode::BAD_REQUEST, "invalid component name"))?;
let definition = ComponentDefinition {
component: component.clone(),
schema: patch.clone(),
};
if let Err(_e) = definition.validate_schema() {
return Err((StatusCode::BAD_REQUEST, "invalid schema"));
}
let mut tx = pool.begin().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to begin transaction",
)
})?;
match crate::sql::component_definition::update(&mut tx, &definition).await {
Ok(_) => {
tx.commit().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to commit transaction",
)
})?;
Ok(Json(definition))
}
Err(_) => Err((StatusCode::INTERNAL_SERVER_ERROR, "internal server error")),
}
}
async fn delete_component_definition_by_id(
State(pool): State<sqlx::PgPool>,
Path(id): Path<String>,
) -> Result<StatusCode, (StatusCode, &'static str)> {
let component =
Component::new(&id).ok_or((StatusCode::BAD_REQUEST, "invalid component name"))?;
let mut tx = pool.begin().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to begin transaction",
)
})?;
match crate::sql::component_definition::delete(&mut tx, &component).await {
Ok(true) => {
tx.commit().await.map_err(|_e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
"failed to commit transaction",
)
})?;
Ok(StatusCode::NO_CONTENT)
}
Ok(false) => Err((StatusCode::NOT_FOUND, "not found")),
Err(_) => Err((StatusCode::INTERNAL_SERVER_ERROR, "internal server error")),
}
}
pub fn create_component_definition_router(pool: sqlx::PgPool) -> Router {
Router::new()
.route(
"/componentdefinition",
get(get_component_definitions)
.post(create_component_definition)
.put(update_component_definition)
.patch(patch_component_definition)
.delete(delete_component_definitions),
)
.route(
"/componentdefinition/:id",
get(get_component_definition_by_id)
.put(update_component_definition_by_id)
.patch(patch_component_definition_by_id)
.delete(delete_component_definition_by_id),
)
.with_state(pool)
}