use crate::dal::DAL;
use crate::database::UniversalUuid;
use crate::error::ExecutorError;
use crate::Database;
use std::collections::HashMap;
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct ExecutionScope {
pub workflow_execution_id: UniversalUuid,
pub task_execution_id: Option<UniversalUuid>,
pub task_name: Option<String>,
}
#[derive(Debug)]
pub struct DependencyLoader {
database: Database,
workflow_execution_id: UniversalUuid,
dependency_tasks: Vec<crate::task::TaskNamespace>,
loaded_contexts: RwLock<HashMap<String, HashMap<String, serde_json::Value>>>, }
impl DependencyLoader {
pub fn new(
database: Database,
workflow_execution_id: UniversalUuid,
dependency_tasks: Vec<crate::task::TaskNamespace>,
) -> Self {
Self {
database,
workflow_execution_id,
dependency_tasks,
loaded_contexts: RwLock::new(HashMap::new()),
}
}
pub async fn load_from_dependencies(
&self,
key: &str,
) -> Result<Option<serde_json::Value>, ExecutorError> {
for dep_task_namespace in self.dependency_tasks.iter().rev() {
let dep_task_name = dep_task_namespace.to_string();
{
let cache = self.loaded_contexts.read().await;
if let Some(context_data) = cache.get(&dep_task_name) {
if let Some(value) = context_data.get(key) {
return Ok(Some(value.clone())); }
}
}
{
let mut cache = self.loaded_contexts.write().await;
if !cache.contains_key(&dep_task_name) {
let dep_context_data = self
.load_dependency_context_data(dep_task_namespace)
.await?;
cache.insert(dep_task_name.clone(), dep_context_data);
}
if let Some(context_data) = cache.get(&dep_task_name) {
if let Some(value) = context_data.get(key) {
return Ok(Some(value.clone())); }
}
}
}
Ok(None) }
async fn load_dependency_context_data(
&self,
task_namespace: &crate::task::TaskNamespace,
) -> Result<HashMap<String, serde_json::Value>, ExecutorError> {
let dal = DAL::new(self.database.clone());
let task_metadata = dal
.task_execution_metadata()
.get_by_workflow_and_task(self.workflow_execution_id, task_namespace)
.await?;
if let Some(context_id) = task_metadata.context_id {
let context = dal.context().read::<serde_json::Value>(context_id).await?;
Ok(context.data().clone())
} else {
Ok(HashMap::new())
}
}
}
#[derive(Debug, Clone)]
pub struct ExecutorConfig {
pub max_concurrent_tasks: usize,
pub task_timeout: std::time::Duration,
pub enable_claiming: bool,
pub heartbeat_interval: std::time::Duration,
}
impl Default for ExecutorConfig {
fn default() -> Self {
Self {
max_concurrent_tasks: 4,
task_timeout: std::time::Duration::from_secs(300), enable_claiming: true,
heartbeat_interval: std::time::Duration::from_secs(10),
}
}
}
#[derive(Debug)]
pub struct ClaimedTask {
pub task_execution_id: UniversalUuid,
pub workflow_execution_id: UniversalUuid,
pub task_name: String,
pub attempt: i32,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_execution_scope_full() {
let wf_exec_id = UniversalUuid::new_v4();
let task_id = UniversalUuid::new_v4();
let scope = ExecutionScope {
workflow_execution_id: wf_exec_id,
task_execution_id: Some(task_id),
task_name: Some("my_task".to_string()),
};
assert_eq!(scope.workflow_execution_id, wf_exec_id);
assert_eq!(scope.task_execution_id, Some(task_id));
assert_eq!(scope.task_name.as_deref(), Some("my_task"));
}
#[test]
fn test_execution_scope_minimal() {
let wf_exec_id = UniversalUuid::new_v4();
let scope = ExecutionScope {
workflow_execution_id: wf_exec_id,
task_execution_id: None,
task_name: None,
};
assert!(scope.task_execution_id.is_none());
assert!(scope.task_name.is_none());
}
#[test]
fn test_execution_scope_clone() {
let scope = ExecutionScope {
workflow_execution_id: UniversalUuid::new_v4(),
task_execution_id: Some(UniversalUuid::new_v4()),
task_name: Some("cloned_task".to_string()),
};
let cloned = scope.clone();
assert_eq!(cloned.workflow_execution_id, scope.workflow_execution_id);
assert_eq!(cloned.task_execution_id, scope.task_execution_id);
assert_eq!(cloned.task_name, scope.task_name);
}
#[test]
fn test_execution_scope_debug() {
let scope = ExecutionScope {
workflow_execution_id: UniversalUuid::new_v4(),
task_execution_id: None,
task_name: Some("debug_task".to_string()),
};
let debug_str = format!("{:?}", scope);
assert!(debug_str.contains("ExecutionScope"));
assert!(debug_str.contains("debug_task"));
}
#[test]
fn test_executor_config_default() {
let config = ExecutorConfig::default();
assert_eq!(config.max_concurrent_tasks, 4);
assert_eq!(config.task_timeout, std::time::Duration::from_secs(300));
assert!(config.enable_claiming);
assert_eq!(
config.heartbeat_interval,
std::time::Duration::from_secs(10)
);
}
#[test]
fn test_executor_config_custom() {
let config = ExecutorConfig {
max_concurrent_tasks: 16,
task_timeout: std::time::Duration::from_secs(60),
enable_claiming: false,
heartbeat_interval: std::time::Duration::from_secs(5),
};
assert_eq!(config.max_concurrent_tasks, 16);
assert_eq!(config.task_timeout, std::time::Duration::from_secs(60));
assert!(!config.enable_claiming);
assert_eq!(config.heartbeat_interval, std::time::Duration::from_secs(5));
}
#[test]
fn test_executor_config_clone() {
let config = ExecutorConfig {
max_concurrent_tasks: 8,
task_timeout: std::time::Duration::from_secs(120),
enable_claiming: true,
heartbeat_interval: std::time::Duration::from_secs(15),
};
let cloned = config.clone();
assert_eq!(cloned.max_concurrent_tasks, config.max_concurrent_tasks);
assert_eq!(cloned.task_timeout, config.task_timeout);
assert_eq!(cloned.enable_claiming, config.enable_claiming);
assert_eq!(cloned.heartbeat_interval, config.heartbeat_interval);
}
#[test]
fn test_executor_config_debug() {
let config = ExecutorConfig::default();
let debug_str = format!("{:?}", config);
assert!(debug_str.contains("ExecutorConfig"));
assert!(debug_str.contains("max_concurrent_tasks"));
assert!(debug_str.contains("4"));
}
#[test]
fn test_claimed_task_construction() {
let task_exec_id = UniversalUuid::new_v4();
let wf_exec_id = UniversalUuid::new_v4();
let task = ClaimedTask {
task_execution_id: task_exec_id,
workflow_execution_id: wf_exec_id,
task_name: "tenant::pkg::wf::my_task".to_string(),
attempt: 1,
};
assert_eq!(task.task_execution_id, task_exec_id);
assert_eq!(task.workflow_execution_id, wf_exec_id);
assert_eq!(task.task_name, "tenant::pkg::wf::my_task");
assert_eq!(task.attempt, 1);
}
#[test]
fn test_claimed_task_retry_attempt() {
let task = ClaimedTask {
task_execution_id: UniversalUuid::new_v4(),
workflow_execution_id: UniversalUuid::new_v4(),
task_name: "t::p::w::task".to_string(),
attempt: 3,
};
assert_eq!(task.attempt, 3);
}
#[test]
fn test_claimed_task_debug() {
let task = ClaimedTask {
task_execution_id: UniversalUuid::new_v4(),
workflow_execution_id: UniversalUuid::new_v4(),
task_name: "t::p::w::debug_task".to_string(),
attempt: 0,
};
let debug_str = format!("{:?}", task);
assert!(debug_str.contains("ClaimedTask"));
assert!(debug_str.contains("debug_task"));
}
#[test]
fn test_dependency_loader_debug() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<DependencyLoader>();
}
}