fuel-indexer-api-server 0.2.2

Fuel Indexer API Server
use crate::uses::{
    authorize_middleware, health_check, metrics, query_graph, register_index_assets,
    stop_index,
};
use async_std::sync::{Arc, RwLock};
use axum::{
    extract::{Extension, Json},
    http::StatusCode,
    middleware,
    response::{IntoResponse, Response},
    routing::{delete, get, post},
    Error as AxumError, Router,
};
use fuel_indexer_database::{queries, IndexerConnectionPool, IndexerDatabaseError};
use fuel_indexer_lib::{config::IndexerConfig, utils::ServiceRequest};
use fuel_indexer_schema::db::{
    graphql::GraphqlError, manager::SchemaManager, IndexerSchemaError,
};
use hyper::{Error as HyperError, Method};
use serde_json::json;
use std::{net::SocketAddr, time::Instant};
use thiserror::Error;
use tokio::sync::mpsc::{error::SendError, Sender};
use tower_http::{
    cors::{Any, CorsLayer},
    trace::{DefaultMakeSpan, DefaultOnRequest, DefaultOnResponse, TraceLayer},
    LatencyUnit,
};
use tracing::{error, Level};

pub type ApiResult<T> = core::result::Result<T, ApiError>;

#[derive(Debug, Error)]
pub enum HttpError {
    #[error("Bad request.")]
    BadRequest,
    #[error("Unauthorized request.")]
    Unauthorized,
    #[error("Not not found. {0:#?}")]
    NotFound(String),
    #[error("Error.")]
    InternalServer,
}

#[derive(Debug, Error)]
pub enum ApiError {
    #[error("Query builder error {0:?}")]
    Graphql(#[from] GraphqlError),
    #[error("Serialization error {0:?}")]
    Serde(#[from] serde_json::Error),
    #[error("Database error {0:?}")]
    Database(#[from] IndexerDatabaseError),
    #[error("Sqlx error {0:?}")]
    Sqlx(#[from] sqlx::Error),
    #[error("Http error {0:?}")]
    Http(#[from] HttpError),
    #[error("Schema error {0:?}")]
    SchemaError(#[from] IndexerSchemaError),
    #[error("Channel send error: {0:?}")]
    ChannelSendError(#[from] SendError<ServiceRequest>),
    #[error("Axum error: {0:?}")]
    AxumError(#[from] AxumError),
    #[error("Hyper error: {0:?}")]
    HyperError(#[from] HyperError),
}

impl Default for ApiError {
    fn default() -> Self {
        ApiError::Http(HttpError::InternalServer)
    }
}

impl From<StatusCode> for ApiError {
    fn from(status: StatusCode) -> Self {
        match status {
            StatusCode::BAD_REQUEST => ApiError::Http(HttpError::BadRequest),
            StatusCode::UNAUTHORIZED => ApiError::Http(HttpError::Unauthorized),
            _ => ApiError::Http(HttpError::InternalServer),
        }
    }
}

impl IntoResponse for ApiError {
    fn into_response(self) -> Response {
        let generic_err_msg = "Internal server error.".to_string();
        // NOTE: Free to add more specific messaging/handing here as needed
        #[allow(clippy::match_single_binding)]
        let (status, err_msg) = match self {
            _ => (StatusCode::INTERNAL_SERVER_ERROR, generic_err_msg),
        };

        error!("{:?} - {}", status, err_msg);

        (
            status,
            Json(json!({
                "success": "false",
                "details": err_msg,
            })),
        )
            .into_response()
    }
}

pub struct GraphQlApi;

impl GraphQlApi {
    pub async fn build(
        config: IndexerConfig,
        pool: IndexerConnectionPool,
        tx: Option<Sender<ServiceRequest>>,
    ) -> ApiResult<Router> {
        let sm = SchemaManager::new(pool.clone());
        let schema_manager = Arc::new(RwLock::new(sm));
        let config = config.clone();
        let start_time = Arc::new(Instant::now());

        if config.graphql_api.run_migrations {
            let mut c = pool.acquire().await?;
            queries::run_migration(&mut c).await?;
        }

        let graph_route = Router::new()
            .route("/:namespace", post(query_graph))
            .layer(Extension(schema_manager.clone()))
            .layer(Extension(pool.clone()));

        let index_routes = Router::new()
            .route("/:namespace/:identifier", post(register_index_assets))
            .route_layer(middleware::from_fn(authorize_middleware))
            .layer(Extension(tx.clone()))
            .layer(Extension(schema_manager))
            .layer(Extension(pool.clone()))
            .route("/:namespace/:identifier", delete(stop_index))
            .route_layer(middleware::from_fn(authorize_middleware))
            .layer(Extension(tx))
            .layer(Extension(pool.clone()));

        let root_routes = Router::new()
            .route("/health", get(health_check))
            .layer(Extension(config))
            .layer(Extension(pool))
            .layer(Extension(start_time))
            .route("/metrics", get(metrics));

        let api_routes = Router::new()
            .nest("/", root_routes)
            .nest("/index", index_routes)
            .nest("/graph", graph_route);

        let app = Router::new()
            .nest("/api", api_routes)
            .layer(
                TraceLayer::new_for_http()
                    .make_span_with(DefaultMakeSpan::new().include_headers(true))
                    .on_request(DefaultOnRequest::new().level(Level::INFO))
                    .on_response(
                        DefaultOnResponse::new()
                            .level(Level::INFO)
                            .latency_unit(LatencyUnit::Micros),
                    ),
            )
            .layer(
                CorsLayer::new()
                    .allow_methods(vec![Method::GET, Method::POST])
                    .allow_origin(Any {}),
            );

        Ok(app)
    }

    pub async fn run(config: IndexerConfig, app: Router) -> ApiResult<()> {
        let listen_on: SocketAddr = config.graphql_api.into();

        axum::Server::bind(&listen_on)
            .serve(app.into_make_service())
            .await?;

        Ok(())
    }

    pub async fn build_and_run(
        config: IndexerConfig,
        pool: IndexerConnectionPool,
        tx: Option<Sender<ServiceRequest>>,
    ) -> ApiResult<()> {
        let listen_on: SocketAddr = config.graphql_api.clone().into();
        let app = GraphQlApi::build(config, pool, tx).await?;

        axum::Server::bind(&listen_on)
            .serve(app.into_make_service())
            .await?;

        Ok(())
    }
}