fileloft-actix 0.2.0

actix-web integration for the tus.io resumable upload protocol
Documentation
use std::io::Cursor;
use std::sync::Arc;

use actix_web::http::header::{HeaderName, HeaderValue};
use actix_web::http::{Method, StatusCode};
use actix_web::web::{self};
use actix_web::{HttpRequest, HttpResponse};
use fileloft_core::{
    handler::{TusBody, TusHandler, TusRequest, TusResponse},
    lock::SendLocker,
    store::SendDataStore,
};
use futures_util::StreamExt;
use tokio_util::io::ReaderStream;

/// Register with `App::new().app_data(handler).service(tus_scope::<S,L>())`.
pub fn tus_scope<S, L>() -> actix_web::Scope
where
    S: SendDataStore + Send + Sync + 'static,
    L: SendLocker + Send + Sync + 'static,
{
    web::scope("")
        .route("", web::route().to(dispatch::<S, L>))
        .route("/{id}", web::route().to(dispatch_with_id::<S, L>))
}

async fn dispatch<S, L>(
    req: HttpRequest,
    mut payload: web::Payload,
    handler: web::Data<Arc<TusHandler<S, L>>>,
) -> Result<HttpResponse, actix_web::Error>
where
    S: SendDataStore + Send + Sync + 'static,
    L: SendLocker + Send + Sync + 'static,
{
    handle_actix(handler.get_ref(), &req, &mut payload, None).await
}

async fn dispatch_with_id<S, L>(
    path: web::Path<String>,
    req: HttpRequest,
    mut payload: web::Payload,
    handler: web::Data<Arc<TusHandler<S, L>>>,
) -> Result<HttpResponse, actix_web::Error>
where
    S: SendDataStore + Send + Sync + 'static,
    L: SendLocker + Send + Sync + 'static,
{
    handle_actix(
        handler.get_ref(),
        &req,
        &mut payload,
        Some(path.into_inner()),
    )
    .await
}

async fn handle_actix<S, L>(
    handler: &Arc<TusHandler<S, L>>,
    req: &HttpRequest,
    payload: &mut web::Payload,
    upload_id: Option<String>,
) -> Result<HttpResponse, actix_web::Error>
where
    S: SendDataStore + Send + Sync + 'static,
    L: SendLocker + Send + Sync + 'static,
{
    let method = actix_to_http_method(req.method());
    let uri = actix_to_http_uri(req);
    let headers = headers_to_http(req);

    let body = if matches!(
        req.method(),
        &Method::HEAD | &Method::DELETE | &Method::OPTIONS | &Method::GET
    ) {
        None
    } else {
        let mut buf = Vec::new();
        while let Some(chunk) = payload.next().await {
            let chunk = chunk?;
            buf.extend_from_slice(&chunk);
        }
        let reader: Box<dyn tokio::io::AsyncRead + Send + Sync + Unpin> =
            Box::new(Cursor::new(buf));
        Some(reader)
    };

    let tus_req = TusRequest {
        method,
        uri,
        upload_id,
        headers,
        body,
    };
    let tus = handler.handle(tus_req).await;
    Ok(map_response(tus))
}

fn actix_to_http_method(m: &Method) -> http::Method {
    http::Method::from_bytes(m.as_str().as_bytes()).unwrap_or(http::Method::GET)
}

fn actix_to_http_uri(req: &HttpRequest) -> http::Uri {
    req.uri()
        .to_string()
        .parse()
        .unwrap_or_else(|_| http::Uri::from_static("/"))
}

fn headers_to_http(req: &HttpRequest) -> http::HeaderMap {
    let mut out = http::HeaderMap::new();
    for (name, value) in req.headers().iter() {
        if let (Ok(n), Ok(v)) = (
            http::header::HeaderName::from_bytes(name.as_str().as_bytes()),
            http::header::HeaderValue::from_bytes(value.as_bytes()),
        ) {
            out.append(n, v);
        }
    }
    out
}

fn map_response(tus: TusResponse) -> HttpResponse {
    let status =
        StatusCode::from_u16(tus.status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
    let mut res = HttpResponse::build(status);
    for (k, v) in tus.headers.iter() {
        if let (Ok(name), Ok(val)) = (
            HeaderName::from_bytes(k.as_str().as_bytes()),
            HeaderValue::from_bytes(v.as_bytes()),
        ) {
            res.insert_header((name, val));
        }
    }
    match tus.body {
        TusBody::Bytes(b) => res.body(b),
        TusBody::Reader(r) => {
            let stream = ReaderStream::new(r).map(|item| {
                item.map_err(|e| actix_web::error::ErrorInternalServerError(e.to_string()))
            });
            res.streaming(stream)
        }
    }
}