manager_handlers 0.4.99989

This crate represents the implementation of manager capable of creating a microservice which has multiple handlers, which can be accesed via http from outside. Each handler can use the other ones via a bus to process the request. The handlers can have a number of replicas
Documentation
use actix_web::{
    web::{self, Data, Path},
    middleware::Next,
    body::MessageBody,
    dev::{ServiceRequest, ServiceResponse},
    HttpResponse, Responder, Error, HttpMessage,
};
use serde_json::json;
use std::collections::{HashMap, HashSet};
use std::sync::{
    Arc,
    atomic::{AtomicBool, Ordering},
};
use std::time::Duration;
use tokio::{
    sync::{Semaphore, Notify, mpsc},
    io::AsyncReadExt,
    net::TcpStream,
    spawn,
    task::JoinHandle,
    time::timeout,
};
use tokio_rustls::TlsStream;
use tokio_stream::wrappers::ReceiverStream;
use futures_util::StreamExt;
use async_stream::stream;
use bytes::Bytes;
use x509_parser::prelude::*;
use crate::{
    manager::{Base, Config, SharedState},
    multibus::MultiBus,
};

pub async fn authentication_middleware(req: ServiceRequest, next: Next<impl MessageBody>) -> Result<ServiceResponse<impl MessageBody>, Error> {
    let config = req
        .app_data::<Data<Config>>()
        .map(|c| c.get_ref().clone());

    if let Some(config) = config {
        if let Some(auth_header) = req.headers().get("Authorization") {
            if auth_header.to_str().unwrap_or("") != config.api_key {
                return Err(actix_web::error::ErrorUnauthorized("Invalid API Key"));
            }
        } else {
            return Err(actix_web::error::ErrorUnauthorized("Missing API Key"));
        }

        let allowed_names = req
            .app_data::<Data<Arc<Option<HashSet<String>>>>>()
            .map(|c| c.get_ref().clone())
            .unwrap_or_default();

        if let Some(allowed_cns) = allowed_names.clone().as_ref() {
            if let Some(tls_stream) = req.extensions().get::<Arc<TlsStream<TcpStream>>>().cloned() {
                if let Some(cert) = tls_stream.get_ref().1.peer_certificates().and_then(|certs| certs.get(0)) {
                    let (_, x509) = X509Certificate::from_der(cert.as_ref()).ok().unwrap();
                    for attr in x509.subject().iter_common_name() {
                        if let Ok(cn) = attr.as_str() {
                            if !allowed_cns.contains(cn) {
                                return Err(actix_web::error::ErrorForbidden("Forbidden: CN not allowed"));
                            }
                        } else {
                            return Err(actix_web::error::ErrorForbidden("Forbidden: Missing CN"));
                        }
                    }
                }
            }
        }
    }
    next.call(req).await
}

pub async fn unauthorized() -> impl Responder {
    HttpResponse::Unauthorized().json(json!({"message": "Unauthorized: Access is denied"}))
}

pub async fn shutdown_from_http(shutdown_flag: Data<AtomicBool>, manager: Data<Arc<Notify>>) -> impl Responder {
    shutdown_flag.store(true, Ordering::SeqCst);
    manager.notify_waiters();
    HttpResponse::Ok().json(json!({"status": "Shutting down"}))
}

async fn handle_common_ending(processor: JoinHandle<Result<String, Box<dyn std::error::Error + Send + Sync>>>, manager: Arc<Notify>, str_handler: String) -> HttpResponse {
    tokio::select! {
        result = processor => {
            println!("Processed a request to handler {}", str_handler);
            match result {
                Ok(result) => {
                    match result {
                        Ok(output) =>  HttpResponse::Ok().json(json!({"status": "Ok", "message": output})),
                        Err(e) =>  HttpResponse::InternalServerError().json(json!({"status": "Error", "message": e.to_string()})),
                    }
                }
                Err(e) => {
                   HttpResponse::InternalServerError().json(json!({"status": "Error", "message": e.to_string()}))
                }
            }
        },
        _ = manager.notified() => {
            println!("Service is stopped forcefully, request aborted");
            HttpResponse::InternalServerError().json(json!({"status": "Error", "message":  "Service closed forcefully"}))
        }
    }
}

/// Handles HTTP requests to process specific handlers.
///
/// # Arguments
/// * `handler_name` - The name of the handler to process the request.
/// * `body` - The request body containing the data to be processed.
/// * `permits` - The semaphore controlling concurrency (only one request at a time).
/// * `instance` - The registered handlers mapped by name.
/// * `manager` - The shutdown notifier.
/// * `communication_line` - The communication line (MultiBus) for message exchange.
/// * `shared_state` - The variable used to access the shared state
///
/// # Returns
/// * A JSON response indicating the result of the handler's processing.

pub async fn process_request(path: Path<String>, mut payload: web::Payload, permits: Data<Arc<Semaphore>>, instance: Data<Arc<HashMap<String, Arc<Box<dyn Fn() -> Box<dyn Base + Send + Sync> + Send + Sync>>>>>, manager: Data<Arc<Notify>>, communication_line: Data<Arc<MultiBus>>, shared_state: Data<Arc<SharedState>>, request_max_per_handler: Data<Arc<HashMap<String, Arc<Semaphore>>>>) -> impl Responder {
    let handler_name = path.into_inner();
    let str_handler = handler_name.to_string();
    let str_handler_copy = str_handler.clone();
    let permits_clone = permits.as_ref().clone();
    let permits_per_handler_clone = request_max_per_handler.as_ref().clone().get(&handler_name).unwrap().clone();
    let communication_line_clone = communication_line.as_ref().clone();
    let shared_state_clone = shared_state.as_ref().clone();
    let data_str = String::from_utf8_lossy(&payload.to_bytes().await.unwrap()).to_string();
    let instance_to_run: Arc<Box<dyn Fn () -> Box<dyn Base + Send + Sync> + Send + Sync>> = instance.get(&str_handler_copy).unwrap().clone();
    let processor = tokio::spawn(async move {
        let permit = permits_clone.clone().acquire_owned().await.unwrap();
        let permit_handler = permits_per_handler_clone.clone().acquire_owned().await.unwrap();
        let result = instance_to_run().run(str_handler_copy, data_str, communication_line_clone.clone(), shared_state_clone.clone()).await;
        drop(permit);
        drop(permit_handler);
        result
    });
    handle_common_ending(processor, manager.get_ref().clone(), str_handler).await
}

/// Handles file download requests.
///
/// # Arguments
///
/// * `file_id` - The unique identifier of the file to download.
/// * `permits` - A semaphore to manage concurrent downloads, ensuring controlled access.
/// * `instance` - A map of handler instances that are used to process the file retrieval logic.
/// * `manager` - A shutdown notifier to monitor and handle service shutdowns.
/// * `communication_line` - The communication line (`MultiBus`) for message exchange between components.
/// * `shared_state` - The shared state (`SharedState`) for managing application-wide state or data.
///
/// # Returns
///
/// * [`Ok(CustomResponse)`](CustomResponse) - On success, returns a response containing the file data as a binary stream.
/// * [`Err(status::Custom<String>)`](status::Custom) - On failure, returns an appropriate error status and message.
///
/// ## Failure Cases:
/// - If the file is not found or fails to process, returns a `500 Internal Server Error`.
/// - If the service is interrupted or forcefully stopped, returns a `503 Service Unavailable`.
/// - If there is a general error during the process, returns a `400 Bad Request`.
pub async fn process_download(path: Path<String>, permits: Data<Arc<Semaphore>>, manager: Data<Arc<Notify>>, request_max_per_handler: Data<Arc<HashMap<String, Arc<Semaphore>>>>, instance: Data<Arc<HashMap<String, Arc<Box<dyn Fn() -> Box<dyn Base + Send + Sync> + Send + Sync>>>>>, communication_line: Data<Arc<MultiBus>>, shared_state: Data<Arc<SharedState>>) -> impl Responder {
    let file_id = path.into_inner();
    let file_id_clone = file_id.clone();
    let str_handler: String = "download".to_string();
    let str_handler_copy = str_handler.clone();
    let permits_clone = permits.get_ref().clone();
    let permits_per_handler_clone = request_max_per_handler.get_ref().get(&str_handler_copy).unwrap().clone();
    let communication_line_clone = communication_line.get_ref().clone();
    let shared_state_clone = shared_state.get_ref().clone();
    let instance_to_run: Arc<Box<dyn Fn () -> Box<dyn Base + Send + Sync> + Send + Sync>> = instance.get_ref().get(&str_handler_copy).unwrap().clone();
    let processor = tokio::spawn(async move {
        let permit = permits_clone.clone().acquire_owned().await.unwrap();
        let permit_handler = permits_per_handler_clone.clone().acquire_owned().await.unwrap();
        let result = instance_to_run().run_file(str_handler_copy, file_id_clone, communication_line_clone.clone(), shared_state_clone.clone()).await;
        drop(permit);
        drop(permit_handler);
        result
    });
    tokio::select! {
        result = processor => {
            println!("Processed a request to handler {}", str_handler);
            match result {
                Ok(result) => {
                    match result {
                        Ok((mut source, size)) => {
                            let response_stream = stream! {
                                let mut buffer = vec![0; 64 * 1024];
                                loop {
                                    match source.read(&mut buffer).await {
                                        Ok(0) => break,
                                        Ok(n) => yield Ok(Bytes::copy_from_slice(&buffer[0..n])),
                                        Err(e) => yield Err(e)
                                    }
                                }
                            };
                            HttpResponse::Ok().streaming(response_stream)
                        },
                        Err(e) => {
                            let error_body = json!({
                                "status": "Error",
                                "message": e.to_string()
                            }).to_string();
                            HttpResponse::InternalServerError().json(error_body)
                        },
                    }
                }
                Err(e) => {
                     let error_body = json!({
                        "status": "Error",
                        "message": e.to_string()
                    }).to_string();
                    HttpResponse::BadRequest().json(error_body)
                }
            }
        },
        _ = manager.notified() => {
            println!("Service is stopped forcefully, request aborted");
             let error_body = json!({
                "status": "Error",
                "message": "Service closed forcefully"
            }).to_string();
            HttpResponse::ServiceUnavailable().json(error_body)
        }
    }
}

/// Handles file upload requests.
///
/// # Arguments
///
/// * `file_name` - The name of the file being uploaded.
/// * `body` - The request body containing the file data in a stream.
/// * `content_length` - The `Content-Length` header value, representing the size of the uploaded file.
/// * `permits` - A semaphore to manage concurrent uploads, ensuring controlled access.
/// * `instance` - A map of handler instances that are used to process the upload logic.
/// * `manager` - A shutdown notifier to monitor and handle service shutdowns.
/// * `communication_line` - The communication line (`MultiBus`) for message exchange between components.
/// * `shared_state` - The shared state (`SharedState`) for managing application-wide state or data.
/// * `has_been_called` - A flag indicating whether the service has been called or interrupted.
///
/// # Returns
///
/// A JSON response indicating the processing result of the uploaded file.
///
/// - On success, returns a JSON object with a `"status"` of `"Ok"` and a corresponding `"message"`.
/// - On failure, returns a JSON object with a `"status"` of `"Error"` and an appropriate error `"message"`.
pub async fn process_upload(path: Path<String>, mut payload: web::Payload, manager: Data<Arc<Notify>>, permits: Data<Arc<Semaphore>>, instance: Data<Arc<HashMap<String, Arc<Box<dyn Fn() -> Box<dyn Base + Send + Sync> + Send + Sync>>>>>, request_max_per_handler: Data<Arc<HashMap<String, Arc<Semaphore>>>>, communication_line: Data<Arc<MultiBus>>, shared_state: Data<Arc<SharedState>>) -> impl Responder {
    let file_name = path.into_inner();
    let lower_bound: usize = 5;
    let str_handler = "upload".to_string();
    let str_handler_copy = str_handler.clone();
    let permits_clone = permits.get_ref().clone();
    let permits_per_handler_clone = request_max_per_handler.get_ref().clone().get(&str_handler_copy).unwrap().clone();
    let communication_line_clone = communication_line.get_ref().clone();
    let shared_state_clone = shared_state.get_ref().clone();

    let (tx, mut rx) = mpsc::channel::<Bytes>(64);
    let instance_to_run: Arc<Box<dyn Fn () -> Box<dyn Base + Send + Sync> + Send + Sync>> = instance.get_ref().get(&str_handler_copy).unwrap().clone();
    let processor = spawn(async move {
        let permit = permits_clone.clone().acquire_owned().await.unwrap();
        let permit_handler = permits_per_handler_clone.clone().acquire_owned().await.unwrap();
        let mut stream_from_channel = ReceiverStream::new(rx);
        let result = instance_to_run().run_stream(
            str_handler_copy,
            Box::pin(stream! {
                while let Some(chunk) = stream_from_channel.next().await {
                    yield chunk;
                }
            }),
            file_name,
            lower_bound,
            communication_line_clone.clone(),
            shared_state_clone.clone()).await;
        drop(permit);
        drop(permit_handler);
        result
    });

    let mut total_size = 0;
    loop {
        match timeout(Duration::from_secs(20), payload.next()).await {
            Ok(None) => {
                if total_size < lower_bound {
                    drop(tx);
                    return HttpResponse::BadRequest().json(json!({"status": "Error", "message": "Length of the stream does not match the Lower-Bound header"}));
                }
                break;
            }
            Ok(Some(Ok(chunk))) => {
                if chunk.is_empty() {
                    break;
                }
                total_size += chunk.len();
                if tx.send(chunk.clone()).await.is_err() {
                    break;
                }
            }
            Ok(Some(Err(e))) => {
                eprintln!("Error reading stream: {}", e);
                break;
            }
            Err(_) => {
                eprintln!("Stream read timed out after 20 seconds");
                drop(tx);
                return HttpResponse::RequestTimeout().json(json!({"status": "Error", "message": "Stream read timed out"}));
            }
        }
    }
    drop(tx);
    handle_common_ending(processor, manager.get_ref().clone(), str_handler).await
}

/// Handles requests to retrieve metadata for a specific file.
///
/// # Arguments
/// * `file_id` - The unique identifier of the file whose metadata is being requested.
/// * `permits` - A semaphore to manage concurrent accesses, ensuring controlled concurrency.
/// * `instance` - A map containing registered handler instances, keyed by handler names.
/// * `manager` - A shutdown notifier to monitor and handle service shutdowns.
/// * `communication_line` - The communication line (`MultiBus`) for inter-component communication.
/// * `shared_state` - The shared state (`SharedState`) for managing application-wide state or data.
///
/// # Returns
/// * A JSON response containing the metadata if the operation succeeds.
/// * A JSON response with an error message if the operation fails.
pub async fn process_metadata(path: Path<String>, manager: Data<Arc<Notify>>, permits: Data<Arc<Semaphore>>, instance: Data<Arc<HashMap<String, Arc<Box<dyn Fn() -> Box<dyn Base + Send + Sync> + Send + Sync>>>>>, request_max_per_handler: Data<Arc<HashMap<String, Arc<Semaphore>>>>, communication_line: Data<Arc<MultiBus>>, shared_state: Data<Arc<SharedState>>) -> impl Responder {
    let file_id = path.into_inner();
    let str_handler: String = "metadata".to_string();
    let str_handler_copy = str_handler.clone();
    let permits_clone = permits.get_ref().clone();
    let permits_per_handler_clone = request_max_per_handler.get_ref().clone().get(&str_handler_copy).unwrap().clone();
    let communication_line_clone = communication_line.get_ref().clone();
    let shared_state_clone = shared_state.get_ref().clone();
    let instance_to_run: Arc<Box<dyn Fn () -> Box<dyn Base + Send + Sync> + Send + Sync>> = instance.get_ref().get(&str_handler_copy).unwrap().clone();
    let processor = spawn(async move {
        let permit = permits_clone.clone().acquire_owned().await.unwrap();
        let permit_handler = permits_per_handler_clone.clone().acquire_owned().await.unwrap();
        let result = instance_to_run().run_metadata(str_handler_copy, file_id, communication_line_clone.clone(), shared_state_clone.clone()).await;
        drop(permit);
        drop(permit_handler);
        result
    });
    handle_common_ending(processor, manager.get_ref().clone(), str_handler).await
}