netmito 0.6.8

A Unified Distributed Transport Evaluation Framework
Documentation
use axum::{
    extract::{Path, Query, State},
    middleware,
    routing::{delete, post},
    Extension, Json, Router,
};
use sea_orm::DbErr;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

use crate::{
    config::{self, InfraPool},
    entity::{content::ArtifactContentType, state::UserState},
    error::{ApiError, ApiResult, Error},
    schema::*,
    service::{
        self,
        auth::{admin_auth_middleware, AuthAdminUser},
        name_validator,
    },
};

pub fn admin_router(st: InfraPool, cancel_token: CancellationToken) -> Router<InfraPool> {
    Router::new()
        .route("/users", post(admin_create_user))
        .route("/users/{username}", delete(admin_delete_user))
        .route(
            "/users/{username}/password",
            post(admin_change_user_password),
        )
        .route(
            "/users/{username}/group-quota",
            post(admin_change_user_group_quota),
        )
        .route("/users/{username}/state", post(admin_change_user_state))
        .route(
            "/groups/{group_name}/storage-quota",
            post(admin_change_group_storage_quota),
        )
        .route("/workers/{uuid}", delete(admin_shutdown_worker))
        .route(
            "/groups/{group_name}/attachments/{*key}",
            delete(admin_delete_attachment),
        )
        .route(
            "/tasks/{uuid}/artifacts/{content_type}",
            delete(admin_delete_artifact),
        )
        .route(
            "/shutdown",
            post(admin_shutdown_coordinator).with_state(cancel_token),
        )
        .route_layer(middleware::from_fn_with_state(
            st.clone(),
            admin_auth_middleware,
        ))
        .with_state(st)
}

pub async fn admin_create_user(
    Extension(_): Extension<AuthAdminUser>,
    State(pool): State<InfraPool>,
    Json(CreateUserReq {
        username,
        md5_password,
        admin,
    }): Json<CreateUserReq>,
) -> ApiResult<()> {
    if !name_validator(&username) {
        Err(ApiError::InvalidRequest("Invalid username".to_string()))
    } else {
        match service::user::create_user(&pool.db, username.clone(), md5_password, admin).await {
            Ok(_) => Ok(()),
            Err(e) => match e {
                crate::error::Error::ApiError(e) => Err(e),
                crate::error::Error::DbError(DbErr::RecordNotInserted) => {
                    Err(ApiError::AlreadyExists(username))
                }
                _ => {
                    tracing::error!("{}", e);
                    Err(ApiError::InternalServerError)
                }
            },
        }
    }
}

pub async fn admin_delete_user(
    Extension(_): Extension<AuthAdminUser>,
    State(pool): State<InfraPool>,
    Path(username): Path<String>,
) -> ApiResult<Json<UserStateResp>> {
    match service::user::change_user_state(&pool.db, username.clone(), UserState::Deleted).await {
        Ok(UserState::Deleted) => Ok(Json(UserStateResp {
            state: UserState::Deleted,
        })),
        Err(Error::DbError(DbErr::RecordNotUpdated)) => Err(ApiError::NotFound(format!(
            "User or group with name {}",
            username
        ))),
        Err(e) => {
            tracing::error!("{}", e);
            Err(ApiError::InternalServerError)
        }
        _ => Err(ApiError::InternalServerError),
    }
}

pub async fn admin_delete_attachment(
    Extension(_): Extension<AuthAdminUser>,
    State(pool): State<InfraPool>,
    Path((group_name, key)): Path<(String, String)>,
) -> Result<(), ApiError> {
    service::s3::admin_delete_attachment(&pool, group_name, key)
        .await
        .map_err(|e| match e {
            crate::error::Error::AuthError(err) => ApiError::AuthError(err),
            crate::error::Error::ApiError(e) => e,
            _ => {
                tracing::error!("{}", e);
                ApiError::InternalServerError
            }
        })?;
    Ok(())
}

pub async fn admin_delete_artifact(
    Extension(_): Extension<AuthAdminUser>,
    State(pool): State<InfraPool>,
    Path((uuid, content_type)): Path<(Uuid, ArtifactContentType)>,
) -> Result<(), ApiError> {
    service::s3::admin_delete_artifact(&pool, uuid, content_type)
        .await
        .map_err(|e| match e {
            crate::error::Error::AuthError(err) => ApiError::AuthError(err),
            crate::error::Error::ApiError(e) => e,
            _ => {
                tracing::error!("{}", e);
                ApiError::InternalServerError
            }
        })?;
    Ok(())
}

pub async fn admin_change_user_password(
    Extension(_): Extension<AuthAdminUser>,
    State(pool): State<InfraPool>,
    Path(username): Path<String>,
    Json(req): Json<AdminChangePasswordReq>,
) -> ApiResult<()> {
    service::auth::admin_change_password(&pool.db, username, req.new_md5_password)
        .await
        .map_err(|e| match e {
            crate::error::Error::AuthError(err) => ApiError::AuthError(err),
            crate::error::Error::ApiError(e) => e,
            _ => {
                tracing::error!("{}", e);
                ApiError::InternalServerError
            }
        })?;
    Ok(())
}

pub async fn admin_change_user_state(
    Extension(_): Extension<AuthAdminUser>,
    State(pool): State<InfraPool>,
    Path(username): Path<String>,
    Json(req): Json<ChangeUserStateReq>,
) -> ApiResult<Json<UserStateResp>> {
    match service::user::change_user_state(&pool.db, username.clone(), req.state).await {
        Ok(state) => Ok(Json(UserStateResp { state })),
        Err(Error::DbError(DbErr::RecordNotUpdated)) => Err(ApiError::NotFound(format!(
            "User or group with name {username}"
        ))),
        Err(e) => {
            tracing::error!("{}", e);
            Err(ApiError::InternalServerError)
        }
    }
}

pub async fn admin_change_group_storage_quota(
    Extension(_): Extension<AuthAdminUser>,
    State(pool): State<InfraPool>,
    Path(group_name): Path<String>,
    Json(req): Json<ChangeGroupStorageQuotaReq>,
) -> ApiResult<Json<GroupStorageQuotaResp>> {
    let storage_quota =
        service::group::change_group_storage_quota(&pool, group_name, req.storage_quota)
            .await
            .map_err(|e| match e {
                crate::error::Error::AuthError(err) => ApiError::AuthError(err),
                crate::error::Error::ApiError(e) => e,
                _ => {
                    tracing::error!("{}", e);
                    ApiError::InternalServerError
                }
            })?;
    Ok(Json(GroupStorageQuotaResp { storage_quota }))
}

pub async fn admin_change_user_group_quota(
    Extension(_): Extension<AuthAdminUser>,
    State(pool): State<InfraPool>,
    Path(username): Path<String>,
    Json(req): Json<ChangeUserGroupQuota>,
) -> ApiResult<Json<UserGroupQuotaResp>> {
    let group_quota = service::user::change_user_group_quota(&pool, username, req.group_quota)
        .await
        .map_err(|e| match e {
            crate::error::Error::AuthError(err) => ApiError::AuthError(err),
            crate::error::Error::ApiError(e) => e,
            _ => {
                tracing::error!("{}", e);
                ApiError::InternalServerError
            }
        })?;
    Ok(Json(UserGroupQuotaResp { group_quota }))
}

pub async fn admin_shutdown_worker(
    Extension(_): Extension<AuthAdminUser>,
    State(pool): State<InfraPool>,
    Path(uuid): Path<Uuid>,
    Query(op): Query<WorkerShutdown>,
) -> Result<(), ApiError> {
    let op = op.op.unwrap_or_default();
    tracing::debug!("Shutdown worker {} with op {:?}", uuid, op);
    service::worker::remove_worker_by_uuid(uuid, op, &pool)
        .await
        .map_err(|e| match e {
            crate::error::Error::AuthError(err) => ApiError::AuthError(err),
            crate::error::Error::ApiError(e) => e,
            _ => {
                tracing::error!("{}", e);
                ApiError::InternalServerError
            }
        })?;

    Ok(())
}

pub async fn admin_shutdown_coordinator(
    Extension(_): Extension<AuthAdminUser>,
    State(cancel_token): State<CancellationToken>,
    Json(req): Json<ShutdownReq>,
) -> Result<(), ApiError> {
    tracing::info!("Coordinator is requested to shutdown");
    if let Some(secret) = config::SHUTDOWN_SECRET.get() {
        if *secret == req.secret {
            tracing::info!("Coordinator shutdown initiated");
            cancel_token.cancel();
            Ok(())
        } else {
            Err(ApiError::AuthError(
                crate::error::AuthError::PermissionDenied,
            ))
        }
    } else {
        Err(ApiError::InternalServerError)
    }
}