use actix_web::{
web::{self, Data, Path},
middleware::Next,
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
HttpResponse, Responder, Error, HttpMessage,
};
use serde_json::json;
use std::collections::{HashMap, HashSet};
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use std::time::Duration;
use tokio::{
sync::{Semaphore, Notify, mpsc},
io::AsyncReadExt,
net::TcpStream,
spawn,
task::JoinHandle,
time::timeout,
};
use tokio_rustls::TlsStream;
use tokio_stream::wrappers::ReceiverStream;
use futures_util::StreamExt;
use async_stream::stream;
use bytes::Bytes;
use x509_parser::prelude::*;
use crate::{
manager::{Base, Config, SharedState},
multibus::MultiBus,
};
pub async fn authentication_middleware(req: ServiceRequest, next: Next<impl MessageBody>) -> Result<ServiceResponse<impl MessageBody>, Error> {
let config = req
.app_data::<Data<Config>>()
.map(|c| c.get_ref().clone());
if let Some(config) = config {
if let Some(auth_header) = req.headers().get("Authorization") {
if auth_header.to_str().unwrap_or("") != config.api_key {
return Err(actix_web::error::ErrorUnauthorized("Invalid API Key"));
}
} else {
return Err(actix_web::error::ErrorUnauthorized("Missing API Key"));
}
let allowed_names = req
.app_data::<Data<Arc<Option<HashSet<String>>>>>()
.map(|c| c.get_ref().clone())
.unwrap_or_default();
if let Some(allowed_cns) = allowed_names.clone().as_ref() {
if let Some(tls_stream) = req.extensions().get::<Arc<TlsStream<TcpStream>>>().cloned() {
if let Some(cert) = tls_stream.get_ref().1.peer_certificates().and_then(|certs| certs.get(0)) {
let (_, x509) = X509Certificate::from_der(cert.as_ref()).ok().unwrap();
for attr in x509.subject().iter_common_name() {
if let Ok(cn) = attr.as_str() {
if !allowed_cns.contains(cn) {
return Err(actix_web::error::ErrorForbidden("Forbidden: CN not allowed"));
}
} else {
return Err(actix_web::error::ErrorForbidden("Forbidden: Missing CN"));
}
}
}
}
}
}
next.call(req).await
}
pub async fn unauthorized() -> impl Responder {
HttpResponse::Unauthorized().json(json!({"message": "Unauthorized: Access is denied"}))
}
pub async fn shutdown_from_http(shutdown_flag: Data<AtomicBool>, manager: Data<Arc<Notify>>) -> impl Responder {
shutdown_flag.store(true, Ordering::SeqCst);
manager.notify_waiters();
HttpResponse::Ok().json(json!({"status": "Shutting down"}))
}
async fn handle_common_ending(processor: JoinHandle<Result<String, Box<dyn std::error::Error + Send + Sync>>>, manager: Arc<Notify>, str_handler: String) -> HttpResponse {
tokio::select! {
result = processor => {
println!("Processed a request to handler {}", str_handler);
match result {
Ok(result) => {
match result {
Ok(output) => HttpResponse::Ok().json(json!({"status": "Ok", "message": output})),
Err(e) => HttpResponse::InternalServerError().json(json!({"status": "Error", "message": e.to_string()})),
}
}
Err(e) => {
HttpResponse::InternalServerError().json(json!({"status": "Error", "message": e.to_string()}))
}
}
},
_ = manager.notified() => {
println!("Service is stopped forcefully, request aborted");
HttpResponse::InternalServerError().json(json!({"status": "Error", "message": "Service closed forcefully"}))
}
}
}
pub async fn process_request(path: Path<String>, mut payload: web::Payload, permits: Data<Arc<Semaphore>>, instance: Data<Arc<HashMap<String, Arc<Box<dyn Fn() -> Box<dyn Base + Send + Sync> + Send + Sync>>>>>, manager: Data<Arc<Notify>>, communication_line: Data<Arc<MultiBus>>, shared_state: Data<Arc<SharedState>>, request_max_per_handler: Data<Arc<HashMap<String, Arc<Semaphore>>>>) -> impl Responder {
let handler_name = path.into_inner();
let str_handler = handler_name.to_string();
let str_handler_copy = str_handler.clone();
let permits_clone = permits.as_ref().clone();
let permits_per_handler_clone = request_max_per_handler.as_ref().clone().get(&handler_name).unwrap().clone();
let communication_line_clone = communication_line.as_ref().clone();
let shared_state_clone = shared_state.as_ref().clone();
let data_str = String::from_utf8_lossy(&payload.to_bytes().await.unwrap()).to_string();
let instance_to_run: Arc<Box<dyn Fn () -> Box<dyn Base + Send + Sync> + Send + Sync>> = instance.get(&str_handler_copy).unwrap().clone();
let processor = tokio::spawn(async move {
let permit = permits_clone.clone().acquire_owned().await.unwrap();
let permit_handler = permits_per_handler_clone.clone().acquire_owned().await.unwrap();
let result = instance_to_run().run(str_handler_copy, data_str, communication_line_clone.clone(), shared_state_clone.clone()).await;
drop(permit);
drop(permit_handler);
result
});
handle_common_ending(processor, manager.get_ref().clone(), str_handler).await
}
pub async fn process_download(path: Path<String>, permits: Data<Arc<Semaphore>>, manager: Data<Arc<Notify>>, request_max_per_handler: Data<Arc<HashMap<String, Arc<Semaphore>>>>, instance: Data<Arc<HashMap<String, Arc<Box<dyn Fn() -> Box<dyn Base + Send + Sync> + Send + Sync>>>>>, communication_line: Data<Arc<MultiBus>>, shared_state: Data<Arc<SharedState>>) -> impl Responder {
let file_id = path.into_inner();
let file_id_clone = file_id.clone();
let str_handler: String = "download".to_string();
let str_handler_copy = str_handler.clone();
let permits_clone = permits.get_ref().clone();
let permits_per_handler_clone = request_max_per_handler.get_ref().get(&str_handler_copy).unwrap().clone();
let communication_line_clone = communication_line.get_ref().clone();
let shared_state_clone = shared_state.get_ref().clone();
let instance_to_run: Arc<Box<dyn Fn () -> Box<dyn Base + Send + Sync> + Send + Sync>> = instance.get_ref().get(&str_handler_copy).unwrap().clone();
let processor = tokio::spawn(async move {
let permit = permits_clone.clone().acquire_owned().await.unwrap();
let permit_handler = permits_per_handler_clone.clone().acquire_owned().await.unwrap();
let result = instance_to_run().run_file(str_handler_copy, file_id_clone, communication_line_clone.clone(), shared_state_clone.clone()).await;
drop(permit);
drop(permit_handler);
result
});
tokio::select! {
result = processor => {
println!("Processed a request to handler {}", str_handler);
match result {
Ok(result) => {
match result {
Ok((mut source, size)) => {
let response_stream = stream! {
let mut buffer = vec![0; 64 * 1024];
loop {
match source.read(&mut buffer).await {
Ok(0) => break,
Ok(n) => yield Ok(Bytes::copy_from_slice(&buffer[0..n])),
Err(e) => yield Err(e)
}
}
};
HttpResponse::Ok().streaming(response_stream)
},
Err(e) => {
let error_body = json!({
"status": "Error",
"message": e.to_string()
}).to_string();
HttpResponse::InternalServerError().json(error_body)
},
}
}
Err(e) => {
let error_body = json!({
"status": "Error",
"message": e.to_string()
}).to_string();
HttpResponse::BadRequest().json(error_body)
}
}
},
_ = manager.notified() => {
println!("Service is stopped forcefully, request aborted");
let error_body = json!({
"status": "Error",
"message": "Service closed forcefully"
}).to_string();
HttpResponse::ServiceUnavailable().json(error_body)
}
}
}
pub async fn process_upload(path: Path<String>, mut payload: web::Payload, manager: Data<Arc<Notify>>, permits: Data<Arc<Semaphore>>, instance: Data<Arc<HashMap<String, Arc<Box<dyn Fn() -> Box<dyn Base + Send + Sync> + Send + Sync>>>>>, request_max_per_handler: Data<Arc<HashMap<String, Arc<Semaphore>>>>, communication_line: Data<Arc<MultiBus>>, shared_state: Data<Arc<SharedState>>) -> impl Responder {
let file_name = path.into_inner();
let lower_bound: usize = 5;
let str_handler = "upload".to_string();
let str_handler_copy = str_handler.clone();
let permits_clone = permits.get_ref().clone();
let permits_per_handler_clone = request_max_per_handler.get_ref().clone().get(&str_handler_copy).unwrap().clone();
let communication_line_clone = communication_line.get_ref().clone();
let shared_state_clone = shared_state.get_ref().clone();
let (tx, mut rx) = mpsc::channel::<Bytes>(64);
let instance_to_run: Arc<Box<dyn Fn () -> Box<dyn Base + Send + Sync> + Send + Sync>> = instance.get_ref().get(&str_handler_copy).unwrap().clone();
let processor = spawn(async move {
let permit = permits_clone.clone().acquire_owned().await.unwrap();
let permit_handler = permits_per_handler_clone.clone().acquire_owned().await.unwrap();
let mut stream_from_channel = ReceiverStream::new(rx);
let result = instance_to_run().run_stream(
str_handler_copy,
Box::pin(stream! {
while let Some(chunk) = stream_from_channel.next().await {
yield chunk;
}
}),
file_name,
lower_bound,
communication_line_clone.clone(),
shared_state_clone.clone()).await;
drop(permit);
drop(permit_handler);
result
});
let mut total_size = 0;
loop {
match timeout(Duration::from_secs(20), payload.next()).await {
Ok(None) => {
if total_size < lower_bound {
drop(tx);
return HttpResponse::BadRequest().json(json!({"status": "Error", "message": "Length of the stream does not match the Lower-Bound header"}));
}
break;
}
Ok(Some(Ok(chunk))) => {
if chunk.is_empty() {
break;
}
total_size += chunk.len();
if tx.send(chunk.clone()).await.is_err() {
break;
}
}
Ok(Some(Err(e))) => {
eprintln!("Error reading stream: {}", e);
break;
}
Err(_) => {
eprintln!("Stream read timed out after 20 seconds");
drop(tx);
return HttpResponse::RequestTimeout().json(json!({"status": "Error", "message": "Stream read timed out"}));
}
}
}
drop(tx);
handle_common_ending(processor, manager.get_ref().clone(), str_handler).await
}
pub async fn process_metadata(path: Path<String>, manager: Data<Arc<Notify>>, permits: Data<Arc<Semaphore>>, instance: Data<Arc<HashMap<String, Arc<Box<dyn Fn() -> Box<dyn Base + Send + Sync> + Send + Sync>>>>>, request_max_per_handler: Data<Arc<HashMap<String, Arc<Semaphore>>>>, communication_line: Data<Arc<MultiBus>>, shared_state: Data<Arc<SharedState>>) -> impl Responder {
let file_id = path.into_inner();
let str_handler: String = "metadata".to_string();
let str_handler_copy = str_handler.clone();
let permits_clone = permits.get_ref().clone();
let permits_per_handler_clone = request_max_per_handler.get_ref().clone().get(&str_handler_copy).unwrap().clone();
let communication_line_clone = communication_line.get_ref().clone();
let shared_state_clone = shared_state.get_ref().clone();
let instance_to_run: Arc<Box<dyn Fn () -> Box<dyn Base + Send + Sync> + Send + Sync>> = instance.get_ref().get(&str_handler_copy).unwrap().clone();
let processor = spawn(async move {
let permit = permits_clone.clone().acquire_owned().await.unwrap();
let permit_handler = permits_per_handler_clone.clone().acquire_owned().await.unwrap();
let result = instance_to_run().run_metadata(str_handler_copy, file_id, communication_line_clone.clone(), shared_state_clone.clone()).await;
drop(permit);
drop(permit_handler);
result
});
handle_common_ending(processor, manager.get_ref().clone(), str_handler).await
}