use super::{
assembler::{ConfigAssembler, ModuleAssembler, ModuleConfigAssemblyInput},
profile_loader::{LoadedProfile, ProfileLoadRequest, ProfileLoader},
repository::TaskRepository,
task::Task,
};
use crate::errors::{ModuleError, ModuleError::ModuleNotFound, Result};
use crate::cacheable::{CacheAble, CacheService};
use crate::common::model::login_info::LoginInfo;
use crate::common::model::message::TaskEvent;
use crate::common::model::{ModuleConfig, NodeDispatchEnvelope, NodeErrorEnvelope, Response};
use crate::common::state::State;
use crate::engine::task::module::Module;
use crate::engine::task::module_dag_processor::ModuleDagProcessor;
use crate::engine::task::parser_error_adapter::{
ErrorEnvelopeSeed, ParserDispatchSeed, extract_error_envelope_seed,
extract_parser_dispatch_seed,
};
use dashmap::DashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use uuid::Uuid;
pub struct TaskFactory {
repository: TaskRepository,
cache_service: Arc<CacheService>,
cookie_service: Option<Arc<CacheService>>,
module_assembler: Arc<tokio::sync::RwLock<ModuleAssembler>>,
profile_loader: ProfileLoader,
cache: Arc<DashMap<String, CacheEntry>>,
state: Arc<State>,
}
pub struct TaskFactoryConfig {
pub repository: TaskRepository,
pub cache_service: Arc<CacheService>,
pub cookie_service: Option<Arc<CacheService>>,
pub module_assembler: Arc<tokio::sync::RwLock<ModuleAssembler>>,
pub state: Arc<State>,
}
const CACHE_TTL: Duration = Duration::from_secs(30);
struct CacheEntry {
task: Arc<Task>,
expires_at: Instant,
}
impl TaskFactory {
pub fn new(config: TaskFactoryConfig) -> Self {
Self {
repository: config.repository,
cache_service: config.cache_service,
cookie_service: config.cookie_service,
module_assembler: config.module_assembler,
profile_loader: ProfileLoader::default(),
cache: Arc::new(DashMap::new()),
state: config.state,
}
}
pub async fn login_info(&self, id: &str) -> Option<LoginInfo> {
if let Some(sync) = self.cookie_service.as_ref() {
let result = LoginInfo::sync(id, sync).await;
match result {
Ok(None) => {
let key = <LoginInfo as CacheAble>::cache_id(id, sync);
log::warn!("cookie not found in cache: key={}", key);
}
Ok(Some(info)) => {
return Some(info);
}
Err(err) => {
log::warn!("cookies load error {}", err);
}
}
}
log::warn!(
"cookie service not configured; skip login_info lookup for id={}",
id
);
None
}
pub async fn create_task_from_model(&self, task_model: &TaskEvent) -> Result<Task> {
let mut task = (*self
.create_task_with_modules(&task_model.platform, &task_model.account, task_model.run_id)
.await?)
.clone();
task.run_id = task_model.run_id;
task.modules.iter_mut().for_each(|m| {
Self::bind_module_execution(m, task_model.run_id);
});
if let Some(names) = &task_model.module
&& !names.is_empty()
{
let want: std::collections::HashSet<&str> = names.iter().map(|s| s.as_str()).collect();
task.modules.retain(|m| want.contains(m.module.name()));
}
Ok(task)
}
async fn load_module_profile(
&self,
account_name: &str,
platform_name: &str,
module_name: &str,
module_impl: Arc<dyn crate::common::interface::ModuleTrait>,
module_config: &ModuleConfig,
) -> Result<LoadedProfile> {
let namespace = self.state.config.read().await.name.clone();
self.profile_loader
.load(ProfileLoadRequest {
namespace: &namespace,
account: account_name,
platform: platform_name,
module_name,
updated_by: "task_factory",
module_impl,
module_config,
})
.await
.map_err(|err| {
ModuleError::Model(
std::io::Error::other(format!(
"failed to load profile for {account_name}-{platform_name}-{module_name}: {err}"
))
.into(),
)
.into()
})
}
fn module_profile_version(module: &Module) -> u64 {
module
.profile
.as_ref()
.map(|profile| profile.version)
.unwrap_or_default()
}
fn module_dag_version(module: &Module) -> String {
module
.workflow
.as_ref()
.and_then(|workflow| workflow.metadata.get("dag_version").cloned())
.unwrap_or_default()
}
fn bind_module_execution(module: &mut Module, run_id: Uuid) {
module.run_id = run_id;
module.processor.set_run_id(run_id);
module.processor.set_execution_binding(
Self::module_profile_version(module),
Self::module_dag_version(module),
);
}
async fn refresh_module_runtime_from_cache(
&self,
module: &mut Module,
run_id: Uuid,
) -> Result<()> {
if let Ok(Some(config)) = ModuleConfig::sync(&module.id(), &self.cache_service).await {
let loaded_profile = self
.load_module_profile(
&module.account.name,
&module.platform.name,
module.module.name(),
module.module.clone(),
&config,
)
.await?;
module.config = Arc::new(config);
module.profile = Some(Arc::new(loaded_profile.snapshot));
module.workflow = Some(Arc::new(loaded_profile.workflow));
}
Self::bind_module_execution(module, run_id);
Ok(())
}
async fn create_task_with_modules(
&self,
platform_name: &str,
account_name: &str,
run_id: Uuid,
) -> Result<Arc<Task>> {
let start = Instant::now();
let cache_key = format!("{account_name}-{platform_name}");
if let Some(cached) = self.get_from_cache(&cache_key).await {
return Ok(cached);
}
log::debug!(
"create_task_with_modules: cache miss for {}, loading from DB",
cache_key
);
let modules = self
.repository
.load_modules_by_account_platform(platform_name, account_name)
.await?;
let account = self.repository.load_account(account_name).await?;
let platform = self.repository.load_platform(platform_name).await?;
let rel_account_platform = self
.repository
.load_account_platform_relation(account.id, platform.id)
.await?;
if modules.is_empty() {
let mut task = Task {
account,
platform,
login_info: None,
modules: vec![],
metadata: Default::default(),
run_id,
prefix_request: Default::default(),
};
task.login_info = self.login_info(&task.id()).await;
let task = Arc::new(task);
self.put_task_aliases(task.clone()).await;
return Ok(task);
}
let module_ids: Vec<i32> = modules.iter().map(|m| m.id).collect();
let module_data_middleware_map = self
.repository
.load_module_data_middleware_relations(&module_ids)
.await?;
let module_download_middleware_map = self
.repository
.load_module_download_middleware_relations(&module_ids)
.await?;
let mut all_data_middleware_ids = std::collections::HashSet::new();
let mut all_download_middleware_ids = std::collections::HashSet::new();
for relations in module_data_middleware_map.values() {
for rel in relations {
all_data_middleware_ids.insert(rel.data_middleware_id);
}
}
for relations in module_download_middleware_map.values() {
for rel in relations {
all_download_middleware_ids.insert(rel.download_middleware_id);
}
}
let all_data_middleware = if !all_data_middleware_ids.is_empty() {
self.repository
.load_data_middlewares(&all_data_middleware_ids.into_iter().collect::<Vec<_>>())
.await?
} else {
vec![]
};
let all_download_middleware = if !all_download_middleware_ids.is_empty() {
self.repository
.load_download_middlewares(
&all_download_middleware_ids.into_iter().collect::<Vec<_>>(),
)
.await?
} else {
vec![]
};
let module_ids_list: Vec<i32> = modules.iter().map(|m| m.id).collect();
let rel_module_platform_map = self
.repository
.load_module_platform_relations(&module_ids_list, platform.id)
.await?;
let rel_module_account_map = self
.repository
.load_module_account_relations(&module_ids_list, account.id)
.await?;
let mut module_instances = Vec::new();
for module in modules {
let rel_module_platform = match rel_module_platform_map.get(&module.id) {
Some(r) => r.clone(),
None => {
log::warn!("Missing platform relation for module {}", module.id);
continue;
}
};
let rel_module_account = match rel_module_account_map.get(&module.id) {
Some(r) => r.clone(),
None => {
log::warn!("Missing account relation for module {}", module.id);
continue;
}
};
let rel_module_data_middleware = module_data_middleware_map
.get(&module.id)
.cloned()
.unwrap_or_default();
let rel_module_download_middleware = module_download_middleware_map
.get(&module.id)
.cloned()
.unwrap_or_default();
let data_middleware: Vec<_> = all_data_middleware
.iter()
.filter(|m| {
rel_module_data_middleware
.iter()
.any(|rel| rel.data_middleware_id == m.id)
})
.cloned()
.collect();
let download_middleware: Vec<_> = all_download_middleware
.iter()
.filter(|m| {
rel_module_download_middleware
.iter()
.any(|rel| rel.download_middleware_id == m.id)
})
.cloned()
.collect();
let module_config =
ConfigAssembler::assemble_module_config(ModuleConfigAssemblyInput {
account: &account,
platform: &platform,
module: &module,
rel_account_platform: &rel_account_platform,
rel_module_platform: &rel_module_platform,
rel_module_account: &rel_module_account,
data_middleware: &data_middleware,
download_middleware: &download_middleware,
rel_module_data_middleware: &rel_module_data_middleware,
rel_module_download_middleware: &rel_module_download_middleware,
});
let assembler = self.module_assembler.read().await;
let module_assembler = match assembler.get_module(&module.name) {
Some(module) => module,
None => continue,
};
if module_assembler.version() != module.version {
continue;
}
let loaded_profile = self
.load_module_profile(
&account.name,
&platform.name,
module_assembler.name(),
module_assembler.clone(),
&module_config,
)
.await?;
let app_config = self.state.config.read().await;
let locker = if loaded_profile.snapshot.common.module_locker {
true
} else {
app_config.download_config.enable_locker
};
let cache_ttl = app_config.cache.ttl;
let dag_dispatcher = None;
let mut module_instance = Module {
config: Arc::new(module_config),
account: account.clone(),
platform: platform.clone(),
error_times: 0,
finished: false,
data_middleware: data_middleware.iter().map(|x| x.name.clone()).collect(),
download_middleware: download_middleware.iter().map(|x| x.name.clone()).collect(),
module: module_assembler,
locker,
locker_ttl: 0,
processor: ModuleDagProcessor::new(
format!("{}-{}-{}", account.name, platform.name, module.name),
self.state.cache_service.clone(),
run_id,
cache_ttl,
),
dag_dispatcher,
run_id,
prefix_request: Default::default(),
pending_ctx: None,
bound_task_meta: None,
bound_login_info: None,
profile: Some(Arc::new(loaded_profile.snapshot)),
workflow: Some(Arc::new(loaded_profile.workflow)),
};
module_instance.add_step().await;
Self::bind_module_execution(&mut module_instance, run_id);
module_instances.push(module_instance);
}
let mut task = Task {
account,
platform,
login_info: None,
modules: module_instances,
metadata: Default::default(),
run_id,
prefix_request: Default::default(),
};
task.login_info = self.login_info(&task.id()).await;
let task = Arc::new(task);
self.put_task_aliases(task.clone()).await;
log::debug!(
"create_task_with_modules: loaded from DB for {}, took {:?}",
cache_key,
start.elapsed()
);
Ok(task)
}
pub async fn load_with_model(&self, task_model: &TaskEvent) -> Result<Task> {
let task = self.create_task_from_model(task_model).await;
match task {
Ok(mut task) => {
task.prefix_request = Uuid::nil();
for module in task.modules.iter_mut() {
module.prefix_request = Uuid::nil();
module
.config
.send(&module.id(), &self.cache_service)
.await
.ok();
}
Ok(task)
}
Err(e) => Err(ModuleNotFound(
format!(
"{}-{}-{:?} not found with error: {}",
task_model.platform, task_model.account, task_model.module, e
)
.into(),
))?,
}
}
async fn load_parser_seed(&self, seed: &ParserDispatchSeed) -> Result<Task> {
let mut task = self.create_task_from_model(&seed.task_model).await?;
task.prefix_request = seed.prefix_request;
task.run_id = seed.run_id;
task.modules
.iter_mut()
.for_each(|m| Self::bind_module_execution(m, seed.run_id));
task.metadata = seed.metadata.clone();
for module in task.modules.iter_mut() {
module.prefix_request = seed.prefix_request;
module.pending_ctx = Some(seed.context.clone());
self.refresh_module_runtime_from_cache(module, seed.run_id)
.await?;
}
Ok(task)
}
pub async fn load_parser_dispatch(&self, dispatch: &NodeDispatchEnvelope) -> Result<Task> {
let seed = extract_parser_dispatch_seed(dispatch)?;
self.load_parser_seed(&seed).await
}
async fn load_error_seed(&self, seed: &ErrorEnvelopeSeed) -> Result<Task> {
let mut task = self.create_task_from_model(&seed.task_model).await?;
task.prefix_request = seed.prefix_request;
task.run_id = seed.run_id;
task.modules.iter_mut().for_each(|m| {
Self::bind_module_execution(m, seed.run_id);
m.prefix_request = seed.prefix_request;
m.pending_ctx = Some(seed.context.clone());
});
for module in task.modules.iter_mut() {
self.refresh_module_runtime_from_cache(module, seed.run_id)
.await?;
}
task.metadata = seed.metadata.clone();
Ok(task)
}
pub async fn load_error_envelope(&self, envelope: &NodeErrorEnvelope) -> Result<Task> {
let seed = extract_error_envelope_seed(envelope)?;
self.load_error_seed(&seed).await
}
pub async fn load_with_response(&self, response: &Response) -> Result<Task> {
self.create_task_with_modules(&response.platform, &response.account, response.run_id)
.await
.map(|t| {
let mut t = (*t).clone();
t.modules.retain(|m| m.module.name() == response.module);
t
})
}
pub async fn load_module_with_response(
&self,
response: &Response,
) -> Result<(Arc<Module>, Option<LoginInfo>)> {
let task = self
.create_task_with_modules(&response.platform, &response.account, response.run_id)
.await?;
if let Some(module) = task
.modules
.iter()
.find(|m| m.module.name() == response.module)
{
let mut module = module.clone();
self.refresh_module_runtime_from_cache(&mut module, response.run_id)
.await?;
Ok((Arc::new(module), task.login_info.clone()))
} else {
Err(
ModuleNotFound(format!("Module {} not found in task", response.module).into())
.into(),
)
}
}
async fn get_from_cache(&self, key: &str) -> Option<Arc<Task>> {
if let Some(entry) = self.cache.get(key) {
if Instant::now() < entry.expires_at {
return Some(entry.task.clone());
} else {
drop(entry); self.cache.remove(key);
return None;
}
}
None
}
pub async fn clear_cache(&self) {
self.cache.clear();
}
async fn put_task_aliases(&self, task: Arc<Task>) {
let entry = CacheEntry {
task: task.clone(),
expires_at: Instant::now() + CACHE_TTL,
};
self.cache.insert(task.id(), entry);
}
}