use crate::dal::DAL;
use crate::database::UniversalUuid;
use crate::error::ExecutorError;
use crate::Database;
use std::collections::HashMap;
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct ExecutionScope {
pub pipeline_execution_id: UniversalUuid,
pub task_execution_id: Option<UniversalUuid>,
pub task_name: Option<String>,
}
#[derive(Debug)]
pub struct DependencyLoader {
database: Database,
pipeline_execution_id: UniversalUuid,
dependency_tasks: Vec<crate::task::TaskNamespace>,
loaded_contexts: RwLock<HashMap<String, HashMap<String, serde_json::Value>>>, }
impl DependencyLoader {
pub fn new(
database: Database,
pipeline_execution_id: UniversalUuid,
dependency_tasks: Vec<crate::task::TaskNamespace>,
) -> Self {
Self {
database,
pipeline_execution_id,
dependency_tasks,
loaded_contexts: RwLock::new(HashMap::new()),
}
}
pub async fn load_from_dependencies(
&self,
key: &str,
) -> Result<Option<serde_json::Value>, ExecutorError> {
for dep_task_namespace in self.dependency_tasks.iter().rev() {
let dep_task_name = dep_task_namespace.to_string();
{
let cache = self.loaded_contexts.read().await;
if let Some(context_data) = cache.get(&dep_task_name) {
if let Some(value) = context_data.get(key) {
return Ok(Some(value.clone())); }
}
}
{
let mut cache = self.loaded_contexts.write().await;
if !cache.contains_key(&dep_task_name) {
let dep_context_data = self
.load_dependency_context_data(dep_task_namespace)
.await?;
cache.insert(dep_task_name.clone(), dep_context_data);
}
if let Some(context_data) = cache.get(&dep_task_name) {
if let Some(value) = context_data.get(key) {
return Ok(Some(value.clone())); }
}
}
}
Ok(None) }
async fn load_dependency_context_data(
&self,
task_namespace: &crate::task::TaskNamespace,
) -> Result<HashMap<String, serde_json::Value>, ExecutorError> {
let dal = DAL::new(self.database.clone());
let task_metadata = dal
.task_execution_metadata()
.get_by_pipeline_and_task(self.pipeline_execution_id, task_namespace)
.await?;
if let Some(context_id) = task_metadata.context_id {
let context = dal.context().read::<serde_json::Value>(context_id).await?;
Ok(context.data().clone())
} else {
Ok(HashMap::new())
}
}
}
#[derive(Debug, Clone)]
pub struct ExecutorConfig {
pub max_concurrent_tasks: usize,
pub task_timeout: std::time::Duration,
}
impl Default for ExecutorConfig {
fn default() -> Self {
Self {
max_concurrent_tasks: 4,
task_timeout: std::time::Duration::from_secs(300), }
}
}
#[derive(Debug)]
pub struct ClaimedTask {
pub task_execution_id: UniversalUuid,
pub pipeline_execution_id: UniversalUuid,
pub task_name: String,
pub attempt: i32,
}