use crate::core::traits::Base;
use actix_web::web::{Data, Path};
use actix_web::{HttpResponse, Responder, web};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::{Notify, Semaphore};
use tokio::task::JoinHandle;
pub async fn unauthorized() -> impl Responder {
HttpResponse::Unauthorized().json(json!({"message": "Unauthorized: Access is denied"}))
}
pub async fn shutdown_from_http(shutdown_flag: Data<Arc<AtomicBool>>, manager: Data<Arc<Notify>>) -> impl Responder {
println!("shutdown from http");
shutdown_flag.store(true, Ordering::SeqCst);
manager.notify_waiters();
HttpResponse::Ok().json(json!({"status": "Shutting down"}))
}
pub(crate) async fn handle_common_ending(processor: JoinHandle<Result<String, Box<dyn std::error::Error + Send + Sync>>>, manager: Arc<Notify>, str_handler: String) -> HttpResponse {
tokio::select! {
result = processor => {
#[cfg(feature = "log")]
println!("Processed a request to handler {}", str_handler);
match result {
Ok(result) => {
match result {
Ok(output) => HttpResponse::Ok().json(json!({"status": "Ok", "message": output})),
Err(e) => HttpResponse::InternalServerError().json(json!({"status": "Error", "message": e.to_string()})),
}
}
Err(e) => {
HttpResponse::InternalServerError().json(json!({"status": "Error", "message": e.to_string()}))
}
}
},
_ = manager.notified() => {
#[cfg(feature = "log")]
println!("Service is stopped forcefully, request aborted");
HttpResponse::InternalServerError().json(json!({"status": "Error", "message": "Service closed forcefully"}))
}
}
}
pub async fn process_request(
path: Path<String>,
payload: web::Payload,
permits: Data<Arc<Semaphore>>,
instance: Data<Arc<HashMap<String, Arc<Box<dyn Fn() -> Box<dyn Base + Send + Sync> + Send + Sync>>>>>,
manager: Data<Arc<Notify>>,
request_max_per_handler: Data<Arc<HashMap<String, Arc<Semaphore>>>>,
) -> impl Responder {
let handler_name: String = path.into_inner();
let permits_arc = permits.get_ref().clone();
let per_handler = request_max_per_handler.get_ref().get(&handler_name).expect("per-handler semaphore missing").clone();
let instance_factory = instance.get_ref().get(&handler_name).expect("handler instance missing").clone();
let body_bytes = payload.to_bytes().await.expect("payload read failed");
let data_str = match std::str::from_utf8(&body_bytes) {
Ok(s) => s.to_owned(),
Err(_) => String::from_utf8_lossy(&body_bytes).into_owned(),
};
let task_handler_name = handler_name.clone();
let processor = tokio::spawn(async move {
let _global = permits_arc.acquire_owned().await.unwrap();
let _local = per_handler.acquire_owned().await.unwrap();
instance_factory().run(task_handler_name, data_str).await
});
handle_common_ending(processor, manager.get_ref().clone(), handler_name).await
}