use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::path::{Component, Path, PathBuf};
use std::sync::{Arc, Mutex};
use thiserror::Error;
use uuid::Uuid;
use crate::base::component::action::{
Action, ActionError, ActionPriority, ActionResult, ActionStatus,
};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum TaskExecutionMode {
OneTime,
Retryable,
Idempotent,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Task {
pub action: Action,
pub execution_mode: TaskExecutionMode,
pub service_name: String,
pub task_metadata: std::collections::HashMap<String, serde_json::Value>,
}
impl Task {
pub fn new(name: String, description: String, service_name: String) -> Self {
let action = Action::new(
name.clone(),
description,
"task_manager".to_string(),
service_name.clone(),
);
Self {
action,
execution_mode: TaskExecutionMode::Retryable,
service_name,
task_metadata: std::collections::HashMap::new(),
}
}
pub fn new_with_config(
name: String,
description: String,
service_name: String,
execution_mode: TaskExecutionMode,
priority: ActionPriority,
) -> Self {
let action = Action::new(
name.clone(),
description,
"task_manager".to_string(),
service_name.clone(),
)
.with_priority(priority);
Self {
action,
execution_mode,
service_name,
task_metadata: std::collections::HashMap::new(),
}
}
pub fn with_execution_mode(mut self, mode: TaskExecutionMode) -> Self {
self.execution_mode = mode;
self
}
pub fn with_priority(mut self, priority: ActionPriority) -> Self {
self.action = self.action.with_priority(priority);
self
}
pub fn with_timeout(mut self, timeout_seconds: u32) -> Self {
self.action = self.action.with_timeout(timeout_seconds);
self
}
pub fn add_metadata<T: Serialize>(&mut self, key: String, value: T) -> Result<(), TaskError> {
let json_value = serde_json::to_value(value)
.map_err(|e| TaskError::SerializationError(e.to_string()))?;
self.task_metadata.insert(key, json_value);
Ok(())
}
pub fn get_metadata(&self, key: &str) -> Option<&serde_json::Value> {
self.task_metadata.get(key)
}
pub async fn execute<T: TaskService + ?Sized>(&mut self, service: &T) -> Result<(), TaskError> {
if !self.action.can_execute() {
return Err(TaskError::InvalidState(self.action.status.clone()));
}
self.action.start_execution();
let start_time = std::time::Instant::now();
let execution_result = service.execute(&self.action).await;
let duration_ms = start_time.elapsed().as_millis() as u64;
match execution_result {
Ok(result_data) => {
let action_result = ActionResult {
success: true,
duration_ms,
data: result_data,
error: None,
metadata: std::collections::HashMap::new(),
};
self.action.complete_execution(action_result);
Ok(())
}
Err(task_error) => {
let error_message = task_error.to_string();
let can_retry = self.action.fail_execution(error_message, duration_ms);
if can_retry && matches!(self.execution_mode, TaskExecutionMode::Retryable) {
Err(TaskError::RetryableFailure(task_error.to_string()))
} else {
Err(task_error)
}
}
}
}
pub async fn execute_simple(&mut self) -> Result<(), TaskError> {
if !self.action.can_execute() {
return Err(TaskError::InvalidState(self.action.status.clone()));
}
self.action.start_execution();
let start_time = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let duration_ms = start_time.elapsed().as_millis() as u64;
let action_result = ActionResult {
success: true,
duration_ms,
data: Some(serde_json::json!({"message": "Task completed successfully"})),
error: None,
metadata: std::collections::HashMap::new(),
};
self.action.complete_execution(action_result);
Ok(())
}
pub fn cancel(&mut self) {
self.action.cancel();
}
pub fn reset(&mut self) {
self.action.reset();
}
pub fn can_execute(&self) -> bool {
self.action.can_execute()
}
pub fn is_complete(&self) -> bool {
self.action.is_terminal()
}
pub fn status(&self) -> &ActionStatus {
&self.action.status
}
pub fn id(&self) -> Uuid {
self.action.id
}
pub fn name(&self) -> &str {
&self.action.name
}
pub fn execution_stats(&self) -> TaskStats {
TaskStats {
execution_count: self.action.execution_count,
success_rate: self.action.success_rate(),
average_duration_ms: self.action.average_duration_ms(),
last_execution: self.action.last_execution,
}
}
pub fn clone_for_new_execution(&self) -> Self {
let mut cloned = self.clone();
cloned.action = self.action.clone_for_new_execution();
cloned
}
}
impl PartialEq for Task {
fn eq(&self, other: &Self) -> bool {
self.action.id == other.action.id
}
}
impl Eq for Task {}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskStats {
pub execution_count: u32,
pub success_rate: f64,
pub average_duration_ms: Option<u64>,
pub last_execution: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Error)]
pub enum TaskError {
#[error("Task execution failed: {0}")]
ExecutionFailed(String),
#[error("Service unavailable: {0}")]
ServiceUnavailable(String),
#[error("Task execution timed out")]
Timeout,
#[error("Task failed but can be retried: {0}")]
RetryableFailure(String),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("Task is not in a valid state for execution: {0:?}")]
InvalidState(ActionStatus),
#[error("Action error: {0}")]
ActionError(#[from] ActionError),
}
#[async_trait]
pub trait TaskService: Debug + Send + Sync {
fn name(&self) -> &str;
async fn execute(&self, action: &Action) -> Result<Option<serde_json::Value>, TaskError>;
fn can_handle(&self, task: &Task) -> bool {
task.service_name == self.name()
}
fn clone_service(&self) -> Box<dyn TaskService>;
}
fn safe_relative_name(name: &str) -> Result<PathBuf, TaskError> {
let candidate = Path::new(name);
if candidate.as_os_str().is_empty() {
return Err(TaskError::ExecutionFailed(
"artifact name must not be empty".to_string(),
));
}
if candidate.is_absolute() {
return Err(TaskError::ExecutionFailed(
"artifact name must be relative".to_string(),
));
}
for component in candidate.components() {
if !matches!(component, Component::Normal(_)) {
return Err(TaskError::ExecutionFailed(
"artifact name must not traverse directories".to_string(),
));
}
}
Ok(candidate.to_path_buf())
}
#[derive(Debug, Clone)]
pub struct DataBackupService {
pub backup_path: String,
}
impl DataBackupService {
pub fn new(backup_path: impl Into<String>) -> Self {
Self {
backup_path: backup_path.into(),
}
}
}
#[async_trait]
impl TaskService for DataBackupService {
fn name(&self) -> &str {
"DataBackupService"
}
async fn execute(&self, action: &Action) -> Result<Option<serde_json::Value>, TaskError> {
let base = Path::new(&self.backup_path);
tokio::fs::create_dir_all(base).await.map_err(|e| {
TaskError::ExecutionFailed(format!("failed to prepare backup dir: {e}"))
})?;
let file_name = match action.get_argument("backup_name").and_then(|v| v.as_str()) {
Some(name) => safe_relative_name(name)?,
None => PathBuf::from(format!("{}.backup.json", action.id)),
};
let target = base.join(&file_name);
let payload = match action.get_argument("payload") {
Some(value) => serde_json::to_vec_pretty(value),
None => serde_json::to_vec_pretty(&serde_json::json!({
"task_id": action.id,
"task_name": action.name,
"description": action.description,
"backed_up_at": chrono::Utc::now(),
})),
}
.map_err(|e| TaskError::SerializationError(e.to_string()))?;
let bytes_written = payload.len();
tokio::fs::write(&target, &payload)
.await
.map_err(|e| TaskError::ExecutionFailed(format!("backup write failed: {e}")))?;
Ok(Some(serde_json::json!({
"backup_path": target.to_string_lossy(),
"bytes_written": bytes_written,
"status": "completed",
"timestamp": chrono::Utc::now(),
})))
}
fn clone_service(&self) -> Box<dyn TaskService> {
Box::new(self.clone())
}
}
#[derive(Debug, Clone)]
pub struct ContentIndexingService {
pub index_name: String,
}
impl ContentIndexingService {
pub fn new(index_name: impl Into<String>) -> Self {
Self {
index_name: index_name.into(),
}
}
fn index_dir() -> PathBuf {
std::env::temp_dir().join("paladin_index")
}
}
#[async_trait]
impl TaskService for ContentIndexingService {
fn name(&self) -> &str {
"ContentIndexingService"
}
async fn execute(&self, action: &Action) -> Result<Option<serde_json::Value>, TaskError> {
let safe_name = safe_relative_name(&self.index_name)?;
let dir = Self::index_dir();
tokio::fs::create_dir_all(&dir)
.await
.map_err(|e| TaskError::ExecutionFailed(format!("failed to prepare index dir: {e}")))?;
let content = action
.get_argument("content")
.and_then(|v| v.as_str())
.unwrap_or(action.description.as_str());
let mut term_counts: BTreeMap<String, usize> = BTreeMap::new();
for term in content.split_whitespace() {
*term_counts.entry(term.to_lowercase()).or_insert(0) += 1;
}
let artifact = serde_json::json!({
"index_name": self.index_name,
"task_id": action.id,
"terms": term_counts,
"indexed_at": chrono::Utc::now(),
});
let bytes = serde_json::to_vec_pretty(&artifact)
.map_err(|e| TaskError::SerializationError(e.to_string()))?;
let target = dir.join(format!(
"{}-{}.index.json",
safe_name.to_string_lossy(),
action.id
));
tokio::fs::write(&target, &bytes)
.await
.map_err(|e| TaskError::ExecutionFailed(format!("index write failed: {e}")))?;
Ok(Some(serde_json::json!({
"index_name": self.index_name,
"index_path": target.to_string_lossy(),
"terms_indexed": term_counts.len(),
"status": "indexed",
"timestamp": chrono::Utc::now(),
})))
}
fn clone_service(&self) -> Box<dyn TaskService> {
Box::new(self.clone())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EmailMessage {
pub to: String,
pub subject: String,
pub body: String,
}
#[async_trait]
pub trait EmailSink: Debug + Send + Sync {
async fn deliver(&self, message: &EmailMessage) -> Result<(), TaskError>;
fn clone_sink(&self) -> Arc<dyn EmailSink>;
}
#[derive(Debug, Clone, Default)]
pub struct InMemoryEmailSink {
delivered: Arc<Mutex<Vec<EmailMessage>>>,
}
impl InMemoryEmailSink {
pub fn new() -> Self {
Self::default()
}
pub fn delivered(&self) -> Vec<EmailMessage> {
self.delivered
.lock()
.map(|guard| guard.clone())
.unwrap_or_default()
}
}
#[async_trait]
impl EmailSink for InMemoryEmailSink {
async fn deliver(&self, message: &EmailMessage) -> Result<(), TaskError> {
self.delivered
.lock()
.map_err(|e| TaskError::ExecutionFailed(format!("sink poisoned: {e}")))?
.push(message.clone());
Ok(())
}
fn clone_sink(&self) -> Arc<dyn EmailSink> {
Arc::new(self.clone())
}
}
#[derive(Debug, Clone)]
pub struct EmailNotificationService {
pub smtp_server: String,
sink: Arc<dyn EmailSink>,
}
impl EmailNotificationService {
pub fn new(smtp_server: impl Into<String>) -> Self {
Self {
smtp_server: smtp_server.into(),
sink: Arc::new(InMemoryEmailSink::new()),
}
}
pub fn with_sink(smtp_server: impl Into<String>, sink: Arc<dyn EmailSink>) -> Self {
Self {
smtp_server: smtp_server.into(),
sink,
}
}
}
#[async_trait]
impl TaskService for EmailNotificationService {
fn name(&self) -> &str {
"EmailNotificationService"
}
async fn execute(&self, action: &Action) -> Result<Option<serde_json::Value>, TaskError> {
let to = action
.get_argument("to_email")
.and_then(|v| v.as_str())
.unwrap_or("unknown@example.com")
.to_string();
let subject = action
.get_argument("subject")
.and_then(|v| v.as_str())
.unwrap_or("(no subject)")
.to_string();
let body = action
.get_argument("body")
.and_then(|v| v.as_str())
.unwrap_or(action.description.as_str())
.to_string();
let message = EmailMessage {
to: to.clone(),
subject,
body,
};
self.sink.deliver(&message).await?;
Ok(Some(serde_json::json!({
"smtp_server": self.smtp_server,
"to_email": to,
"status": "sent",
"timestamp": chrono::Utc::now(),
})))
}
fn clone_service(&self) -> Box<dyn TaskService> {
Box::new(self.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[tokio::test]
async fn test_task_creation() {
let task = Task::new(
"Test Task".to_string(),
"A test task".to_string(),
"TestService".to_string(),
);
assert_eq!(task.name(), "Test Task");
assert_eq!(task.service_name, "TestService");
assert_eq!(task.execution_mode, TaskExecutionMode::Retryable);
assert!(task.can_execute());
}
#[tokio::test]
async fn test_task_execution_with_service() {
let mut task = Task::new(
"Backup Task".to_string(),
"Backup data task".to_string(),
"DataBackupService".to_string(),
);
let temp = std::env::temp_dir().join(format!("paladin_backup_test_{}", Uuid::new_v4()));
let service = DataBackupService::new(temp.to_string_lossy().to_string());
let result = task.execute(&service).await;
assert!(result.is_ok());
assert_eq!(task.status(), &ActionStatus::Completed);
assert_eq!(task.action.execution_count, 1);
}
#[tokio::test]
async fn test_task_simple_execution() {
let mut task = Task::new(
"Simple Task".to_string(),
"A simple task".to_string(),
"SimpleService".to_string(),
);
let result = task.execute_simple().await;
assert!(result.is_ok());
assert_eq!(task.status(), &ActionStatus::Completed);
}
#[tokio::test]
async fn test_task_with_metadata() {
let mut task = Task::new(
"Metadata Task".to_string(),
"Task with metadata".to_string(),
"TestService".to_string(),
);
task.add_metadata("custom_field".to_string(), "custom_value")
.unwrap();
task.add_metadata("priority_level".to_string(), 5).unwrap();
assert_eq!(
task.get_metadata("custom_field"),
Some(&json!("custom_value"))
);
assert_eq!(task.get_metadata("priority_level"), Some(&json!(5)));
}
#[tokio::test]
async fn test_task_configuration() {
let task = Task::new_with_config(
"Config Task".to_string(),
"Configured task".to_string(),
"ConfigService".to_string(),
TaskExecutionMode::Idempotent,
ActionPriority::High,
)
.with_timeout(30);
assert_eq!(task.execution_mode, TaskExecutionMode::Idempotent);
assert_eq!(task.action.priority, ActionPriority::High);
assert_eq!(task.action.timeout_seconds, Some(30));
}
#[tokio::test]
async fn test_task_statistics() {
let mut task = Task::new(
"Stats Task".to_string(),
"Task for statistics".to_string(),
"StatsService".to_string(),
);
task.execute_simple().await.unwrap();
let stats = task.execution_stats();
assert_eq!(stats.execution_count, 1);
assert_eq!(stats.success_rate, 100.0);
assert!(stats.average_duration_ms.is_some());
assert!(stats.last_execution.is_some());
}
#[tokio::test]
async fn test_task_clone_for_new_execution() {
let mut original = Task::new(
"Original Task".to_string(),
"Original task".to_string(),
"TestService".to_string(),
);
original.execute_simple().await.unwrap();
let cloned = original.clone_for_new_execution();
assert_ne!(original.id(), cloned.id());
assert_eq!(cloned.status(), &ActionStatus::Pending);
assert_eq!(cloned.action.execution_count, 0);
}
#[tokio::test]
async fn test_email_notification_service() {
let mut task = Task::new(
"Email Task".to_string(),
"Send email notification".to_string(),
"EmailNotificationService".to_string(),
);
task.action
.add_argument("to_email".to_string(), "user@example.com")
.unwrap();
task.action
.add_argument("subject".to_string(), "Test Subject")
.unwrap();
let service = EmailNotificationService::new("smtp.example.com");
let result = task.execute(&service).await;
assert!(result.is_ok());
assert_eq!(task.status(), &ActionStatus::Completed);
}
#[tokio::test]
async fn test_data_backup_service_writes_artifact() {
let dir = std::env::temp_dir().join(format!("paladin_backup_{}", Uuid::new_v4()));
let service = DataBackupService::new(dir.to_string_lossy().to_string());
let action = Action::new(
"backup".to_string(),
"nightly backup".to_string(),
"task_manager".to_string(),
"DataBackupService".to_string(),
);
let result = service.execute(&action).await.unwrap().unwrap();
let path = result["backup_path"].as_str().unwrap();
assert!(
std::path::Path::new(path).exists(),
"backup file must exist"
);
assert!(result["bytes_written"].as_u64().unwrap() > 0);
assert_eq!(result["status"], "completed");
let _ = std::fs::remove_dir_all(&dir);
}
#[tokio::test]
async fn test_data_backup_service_rejects_path_traversal() {
let dir = std::env::temp_dir().join(format!("paladin_backup_{}", Uuid::new_v4()));
let service = DataBackupService::new(dir.to_string_lossy().to_string());
let mut action = Action::new(
"backup".to_string(),
"malicious backup".to_string(),
"task_manager".to_string(),
"DataBackupService".to_string(),
);
action
.add_argument("backup_name".to_string(), "../escape.json")
.unwrap();
let result = service.execute(&action).await;
assert!(matches!(result, Err(TaskError::ExecutionFailed(_))));
let _ = std::fs::remove_dir_all(&dir);
}
#[tokio::test]
async fn test_content_indexing_service_builds_index() {
let service = ContentIndexingService::new(format!("idx_{}", Uuid::new_v4()));
let mut action = Action::new(
"index".to_string(),
"index this".to_string(),
"task_manager".to_string(),
"ContentIndexingService".to_string(),
);
action
.add_argument("content".to_string(), "alpha beta beta gamma")
.unwrap();
let result = service.execute(&action).await.unwrap().unwrap();
let path = result["index_path"].as_str().unwrap();
assert!(std::path::Path::new(path).exists());
assert_eq!(result["terms_indexed"].as_u64().unwrap(), 3);
assert_eq!(result["status"], "indexed");
let _ = std::fs::remove_file(path);
}
#[tokio::test]
async fn test_email_service_delivers_to_sink() {
let sink = InMemoryEmailSink::new();
let service = EmailNotificationService::with_sink("smtp.test", sink.clone_sink());
let mut action = Action::new(
"email".to_string(),
"hello body".to_string(),
"task_manager".to_string(),
"EmailNotificationService".to_string(),
);
action
.add_argument("to_email".to_string(), "a@b.com")
.unwrap();
action.add_argument("subject".to_string(), "Hi").unwrap();
let result = service.execute(&action).await.unwrap().unwrap();
assert_eq!(result["status"], "sent");
let delivered = sink.delivered();
assert_eq!(delivered.len(), 1);
assert_eq!(delivered[0].to, "a@b.com");
assert_eq!(delivered[0].subject, "Hi");
}
#[tokio::test]
async fn test_email_service_propagates_sink_failure() {
#[derive(Debug, Clone)]
struct FailingSink;
#[async_trait]
impl EmailSink for FailingSink {
async fn deliver(&self, _message: &EmailMessage) -> Result<(), TaskError> {
Err(TaskError::ServiceUnavailable("transport down".to_string()))
}
fn clone_sink(&self) -> Arc<dyn EmailSink> {
Arc::new(self.clone())
}
}
let service = EmailNotificationService::with_sink("smtp.test", Arc::new(FailingSink));
let action = Action::new(
"email".to_string(),
"body".to_string(),
"task_manager".to_string(),
"EmailNotificationService".to_string(),
);
let result = service.execute(&action).await;
assert!(matches!(result, Err(TaskError::ServiceUnavailable(_))));
}
}