#![allow(unused)]
use crate::common::interface::middleware::{
DataMiddlewareHandle, DataStoreMiddlewareHandle, DownloadMiddlewareHandle,
};
use crate::common::model::data::DataEvent;
use crate::common::model::workflow_profile::TaskProfileSnapshot;
use crate::common::model::{Request, Response};
use crate::common::state::State;
use crate::errors::Result;
use futures::future::join_all;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct MiddlewareManager {
pub download_middleware: Arc<RwLock<Vec<DownloadMiddlewareHandle>>>,
download_index: Arc<RwLock<HashMap<String, usize>>>,
pub data_middleware: Arc<RwLock<Vec<DataMiddlewareHandle>>>,
data_index: Arc<RwLock<HashMap<String, usize>>>,
pub store_middleware: Arc<RwLock<Vec<DataStoreMiddlewareHandle>>>,
store_index: Arc<RwLock<HashMap<String, usize>>>,
pub state: Arc<State>,
}
impl MiddlewareManager {
pub fn new(state: Arc<State>) -> Self {
MiddlewareManager {
download_middleware: Default::default(),
download_index: Default::default(),
data_middleware: Default::default(),
data_index: Default::default(),
store_middleware: Default::default(),
store_index: Default::default(),
state,
}
}
pub async fn register_download_middleware(&self, middleware: DownloadMiddlewareHandle) {
let mut middlewares = self.download_middleware.write().await;
let mut index = self.download_index.write().await;
let name = middleware.lock().await.name();
let pos = middlewares.len();
middlewares.push(middleware);
index.insert(name, pos);
}
pub async fn register_download_middleware_from_vec(
&self,
middlewares: Vec<DownloadMiddlewareHandle>,
) {
let mut existing = self.download_middleware.write().await;
let mut index = self.download_index.write().await;
for middleware in middlewares {
let name = middleware.lock().await.name();
let pos = existing.len();
existing.push(middleware);
index.insert(name, pos);
}
}
pub async fn register_data_middleware(&self, middleware: DataMiddlewareHandle) {
let mut middlewares = self.data_middleware.write().await;
let mut index = self.data_index.write().await;
let name = middleware.lock().await.name();
let pos = middlewares.len();
middlewares.push(middleware);
index.insert(name, pos);
}
pub async fn register_data_middleware_from_vec(&self, middlewares: Vec<DataMiddlewareHandle>) {
let mut existing = self.data_middleware.write().await;
let mut index = self.data_index.write().await;
for middleware in middlewares {
let name = middleware.lock().await.name();
let pos = existing.len();
existing.push(middleware);
index.insert(name, pos);
}
}
pub async fn register_store_middleware(&self, middleware: DataStoreMiddlewareHandle) {
let mut middlewares = self.store_middleware.write().await;
let mut index = self.store_index.write().await;
let name = middleware.lock().await.name();
let pos = middlewares.len();
middlewares.push(middleware);
index.insert(name, pos);
}
pub async fn register_store_middleware_from_vec(
&self,
middlewares: Vec<DataStoreMiddlewareHandle>,
) {
let mut existing = self.store_middleware.write().await;
let mut index = self.store_index.write().await;
for middleware in middlewares {
let name = middleware.lock().await.name();
let pos = existing.len();
existing.push(middleware);
index.insert(name, pos);
}
}
async fn get_download_middleware(
&self,
middleware_name: &[String],
profile: &TaskProfileSnapshot,
) -> Vec<(DownloadMiddlewareHandle, u32)> {
let middlewares = self.download_middleware.read().await;
let index = self.download_index.read().await;
let mut out = Vec::with_capacity(middleware_name.len());
for name in middleware_name {
if let Some(&pos) = index.get(name)
&& let Some(handle) = middlewares.get(pos)
{
let middleware_guard = handle.lock().await;
let middleware_weight = profile
.download_middleware
.iter()
.find(|b| b.name == *name)
.map(|b| b.weight as u32)
.unwrap_or_else(|| middleware_guard.weight());
out.push((handle.clone(), middleware_weight));
}
}
out
}
async fn get_data_middleware(
&self,
middleware_name: &[String],
profile: &TaskProfileSnapshot,
) -> Vec<(DataMiddlewareHandle, u32)> {
let middlewares = self.data_middleware.read().await;
let index = self.data_index.read().await;
let mut out = Vec::with_capacity(middleware_name.len());
for name in middleware_name {
if let Some(&pos) = index.get(name)
&& let Some(handle) = middlewares.get(pos)
{
let middleware_guard = handle.lock().await;
let middleware_weight = profile
.data_middleware
.iter()
.find(|b| b.name == *name)
.map(|b| b.weight as u32)
.unwrap_or_else(|| middleware_guard.weight());
out.push((handle.clone(), middleware_weight));
}
}
out
}
async fn get_store_middleware(
&self,
middleware_name: &[String],
) -> Vec<DataStoreMiddlewareHandle> {
let middlewares = self.store_middleware.read().await;
let index = self.store_index.read().await;
let mut out = Vec::with_capacity(middleware_name.len());
for name in middleware_name {
if let Some(&pos) = index.get(name)
&& let Some(handle) = middlewares.get(pos)
{
out.push(handle.clone());
}
}
out
}
pub async fn handle_request(
&self,
request: Request,
profile: &TaskProfileSnapshot,
) -> Option<Request> {
let mut req = request;
let mut middleware: Vec<(DownloadMiddlewareHandle, u32)> = self
.get_download_middleware(&req.download_middleware, profile)
.await;
middleware.sort_by(|x, y| x.1.cmp(&y.1));
for (middleware, _) in middleware {
let mut middleware = middleware.lock().await;
match middleware.before_request(req, profile).await {
Some(next_req) => req = next_req,
None => return None,
}
}
Some(req)
}
pub async fn handle_response(
&self,
response: Response,
profile: &TaskProfileSnapshot,
) -> Option<Response> {
let mut resp = response;
let mut middleware: Vec<(DownloadMiddlewareHandle, u32)> = self
.get_download_middleware(&resp.download_middleware, profile)
.await;
middleware.sort_by(|x, y| y.1.cmp(&x.1));
for (middleware, _) in middleware {
let mut middleware = middleware.lock().await;
match middleware.after_response(resp, profile).await {
Some(next_resp) => resp = next_resp,
None => return None,
}
}
Some(resp)
}
pub async fn handle_data(
&self,
data: DataEvent,
profile: &TaskProfileSnapshot,
) -> Option<DataEvent> {
let mut data = data;
let mut middleware: Vec<(DataMiddlewareHandle, u32)> = self
.get_data_middleware(&data.data_middleware, profile)
.await;
middleware.sort_by(|x, y| x.1.cmp(&y.1));
for (middleware, _) in middleware {
let mut middleware = middleware.lock().await;
match middleware.handle_data(data, profile).await {
Some(next_data) => data = next_data,
None => return None,
}
}
Some(data)
}
pub async fn handle_store_data(
&self,
data: DataEvent,
profile: &TaskProfileSnapshot,
) -> HashMap<String, Result<()>> {
let middleware = self.get_store_middleware(&data.data_middleware).await;
let tasks = middleware.into_iter().map(|m| {
let data_cloned = data.clone();
let module_name = data.module.clone();
async move {
let mut middleware = m.lock().await;
let middleware_name = middleware.name();
let name = format!(
"{}, schema: {}, table: {}",
middleware_name, module_name, middleware_name
);
let result = match middleware.before_store(profile).await {
Ok(()) => match middleware.store_data(data_cloned, profile).await {
Ok(()) => middleware.after_store(profile).await,
Err(e) => Err(e),
},
Err(e) => Err(e),
};
(name, result)
}
});
let mut results: Vec<(String, Result<()>)> = join_all(tasks).await;
results.into_iter().filter(|x| x.1.is_err()).collect()
}
pub async fn handle_store_data_with_middleware(
&self,
data: DataEvent,
middleware: Vec<String>,
profile: &TaskProfileSnapshot,
) -> HashMap<String, Result<()>> {
let middleware = self.get_store_middleware(&middleware).await;
let tasks = middleware.into_iter().map(|m| {
let data_cloned = data.clone();
async move {
let mut middleware = m.lock().await;
let name = middleware.name();
let result = match middleware.before_store(profile).await {
Ok(()) => match middleware.store_data(data_cloned, profile).await {
Ok(()) => middleware.after_store(profile).await,
Err(e) => Err(e),
},
Err(e) => Err(e),
};
(name, result)
}
});
let mut results: Vec<(String, Result<()>)> = join_all(tasks).await;
results.into_iter().filter(|x| x.1.is_err()).collect()
}
}