ax_core 0.3.2

Core library implementing the functions of ax
Documentation
use crate::{
    api::{
        filters::{authenticate, header_or_query_token},
        rejections::ApiError,
        NodeInfo,
    },
    balanced_or,
    swarm::blob_store::{BlobStore, BlobTooLarge},
};
use ax_types::AppId;
use bytes::Bytes;
use http::StatusCode;
use std::{borrow::Cow, convert::TryFrom};
use warp::{
    body::bytes,
    delete, get,
    header::{header, optional},
    http::Response,
    path::{self, Tail},
    post, put,
    reject::{self},
    Filter, Rejection, Reply,
};

fn content_length(limit: usize) -> impl Filter<Extract = (), Error = Rejection> + Clone {
    optional::<usize>("Content-Length")
        .and_then(move |cl| async move {
            let error = if let Some(cl) = cl {
                if cl <= limit {
                    return Ok(());
                } else {
                    ApiError::TooLarge { size: cl, limit }
                }
            } else {
                ApiError::LengthUnknown { limit }
            };
            tracing::warn!("rejecting upload: {}", error);
            Err(reject::custom(error))
        })
        .untuple_one()
}

pub(crate) fn routes(
    store: BlobStore,
    node_info: NodeInfo,
) -> impl Filter<Extract = (impl Reply,), Error = Rejection> + Clone {
    let f = authenticate(node_info, header_or_query_token())
        .and(path::param().and_then(|app: String| async move {
            AppId::try_from(&*app).map_err(|e| reject::custom(ApiError::BadRequest { cause: e.to_string() }))
        }))
        .and(path::tail())
        .and(warp::any().map(move || store.clone()));
    balanced_or!(
        get().and(f.clone()).and(header("Accept")).and_then(handle_get),
        delete().and(f.clone()).and_then(handle_delete),
        put()
            .and(f.clone())
            .and(content_length(10485760))
            .and(bytes())
            .and(header("Content-Type"))
            .and_then(handle_put),
        post().and(f).and_then(handle_post),
    )
}

async fn handle_get(
    app_id: AppId,
    target: AppId,
    tail: Tail,
    store: BlobStore,
    accept: String,
) -> Result<impl Reply, Rejection> {
    let app = if target.as_str() == "-" { app_id } else { target };
    let path = tail.as_str().to_owned();
    match store.blob_get(app.clone(), path) {
        Ok(Some((data, mime))) => {
            if accept.contains(&*mime) || accept.contains(mime_wild(&mime).as_ref()) || accept.contains("*/*") {
                Ok(Response::builder().header("Content-Type", mime).body(data))
            } else {
                Err(reject::custom(ApiError::NotAcceptable {
                    supported: mime,
                    requested: accept,
                }))
            }
        }
        Ok(None) => Ok(Response::builder().status(StatusCode::NO_CONTENT).body(vec![])),
        Err(err) => {
            tracing::error!("error while getting blob {}/{}: {}", app, tail.as_str(), err);
            Err(reject::custom(ApiError::Internal))
        }
    }
}

fn mime_wild(mime: &str) -> Cow<str> {
    if let Some(idx) = mime.find('/') {
        Cow::Owned(format!("{}/*", &mime[..idx]))
    } else {
        Cow::Borrowed(mime)
    }
}

async fn handle_delete(app_id: AppId, target: AppId, tail: Tail, store: BlobStore) -> Result<impl Reply, Rejection> {
    if target.as_str() != "-" {
        return Err(reject::custom(ApiError::BadRequest {
            cause: format!("cannot delete blob for specific appId {}", target),
        }));
    }
    let path = tail.as_str().to_owned();
    match store.blob_del(app_id.clone(), path) {
        Ok(_) => Ok(StatusCode::NO_CONTENT),
        Err(err) => {
            tracing::error!("error while deleting blob {}/{}: {}", app_id, tail.as_str(), err);
            Err(reject::custom(ApiError::Internal))
        }
    }
}

async fn handle_put(
    app_id: AppId,
    target: AppId,
    tail: Tail,
    store: BlobStore,
    bytes: Bytes,
    mime_type: String,
) -> Result<impl Reply, Rejection> {
    if target.as_str() != "-" {
        return Err(reject::custom(ApiError::BadRequest {
            cause: format!("cannot put blob for specific appId {}", target),
        }));
    }
    let path = tail.as_str().to_owned();
    match store.blob_put(app_id.clone(), path, mime_type, bytes.as_ref()) {
        Ok(_) => Ok(StatusCode::NO_CONTENT),
        Err(err) => {
            if let Some(err) = err.downcast_ref::<BlobTooLarge>() {
                tracing::warn!("error while putting blob {}/{}: {}", app_id, tail.as_str(), err);
                Err(reject::custom(ApiError::TooLarge {
                    size: err.size,
                    limit: err.limit,
                }))
            } else {
                tracing::error!("error while putting blob {}/{}: {}", app_id, tail.as_str(), err);
                Err(reject::custom(ApiError::Internal))
            }
        }
    }
}

async fn handle_post(_app_id: AppId, _target: AppId, _tail: Tail, _store: BlobStore) -> Result<impl Reply, Rejection> {
    Ok(StatusCode::NO_CONTENT)
}