ax_core 0.3.2

Core library implementing the functions of ax
Documentation
use super::ndjson;

use crate::{
    api::{events::service::EventService, rejections::ApiError, Result},
    runtime::features::FeatureError,
    swarm::event_store_ref,
};
use ax_types::{
    service::{PublishRequest, QueryRequest, SubscribeMonotonicRequest, SubscribeRequest},
    AppId,
};
use warp::{reply, Rejection, Reply};

pub async fn offsets(_app_id: AppId, event_service: EventService) -> Result<impl Reply> {
    event_service
        .offsets()
        .await
        .map(|reply| reply::json(&reply))
        .map(|reply| reply::with_header(reply, http::header::CACHE_CONTROL, "no-cache"))
        .map_err(reject)
}

pub async fn publish(app_id: AppId, request: PublishRequest, event_service: EventService) -> Result<impl Reply> {
    event_service
        .publish(app_id, request)
        .await
        .map(|reply| reply::json(&reply))
        .map_err(reject)
}

pub async fn query(app_id: AppId, request: QueryRequest, event_service: EventService) -> Result<impl Reply> {
    event_service
        .query(app_id, request)
        .await
        .map(|events| ndjson::reply(ndjson::keep_alive().stream(events)))
        .map_err(reject)
}

pub async fn subscribe(app_id: AppId, request: SubscribeRequest, event_service: EventService) -> Result<impl Reply> {
    event_service
        .subscribe(app_id, request)
        .await
        .map(|events| ndjson::reply(ndjson::keep_alive().stream(events)))
        .map_err(reject)
}

pub async fn subscribe_monotonic(
    app_id: AppId,
    request: SubscribeMonotonicRequest,
    event_service: EventService,
) -> Result<impl Reply> {
    event_service
        .subscribe_monotonic(app_id, request)
        .await
        .map(|events| ndjson::reply(ndjson::keep_alive().stream(events)))
        .map_err(reject)
}

fn reject(err: anyhow::Error) -> Rejection {
    if let Some(e) = err.downcast_ref::<event_store_ref::Error>() {
        let cause = e.to_string();
        return match e {
            event_store_ref::Error::Aborted => warp::reject::custom(ApiError::Shutdown { cause }),
            event_store_ref::Error::Overload => warp::reject::custom(ApiError::Overloaded { cause }),
            event_store_ref::Error::InvalidUpperBounds => warp::reject::custom(ApiError::BadRequest { cause }),
            event_store_ref::Error::TagExprError(_) => warp::reject::custom(ApiError::BadRequest { cause }),
        };
    }
    let err = match err.downcast::<ApiError>() {
        Ok(e) => return warp::reject::custom(e),
        Err(e) => e,
    };
    match err.downcast::<FeatureError>() {
        Ok(e) => warp::reject::custom(ApiError::from(e)),
        Err(err) => {
            tracing::warn!("internal error: {:?}", err);
            crate::api::reject(err)
        }
    }
}