pub mod listener;
pub mod scheduler;
pub mod types;
pub use crate::core::platform::container::orchestration_context::OrchestrationContext;
pub use listener::{EventListener, ListenerConfig, ListenerService, ListenerStats};
pub use scheduler::{Schedule, Scheduler, SchedulerStats};
pub use types::{
ContentAnalysisType, ContentProcessingResult, ContentProcessor, DefaultContentProcessor,
ListenerError, OrchestratorError, OrchestratorStats, SchedulerError,
};
use crate::application::use_cases::queue_orchestrator::QueueService;
use crate::core::base::component::action::{Action, ActionPriority};
use crate::core::base::component::event::Event;
use crate::core::base::entity::message::{Location, Message, MessagePriority};
use crate::core::platform::container::content::ContentItem;
use crate::core::platform::container::job::Job;
use crate::core::platform::container::queue_item::QueueItem;
use crate::core::platform::container::task::{Task, TaskError, TaskService};
use crate::core::platform::container::trigger::{Trigger, TriggerCondition};
use crate::core::platform::container::workflow::{
Workflow, WorkflowExecutionOrder, WorkflowListener,
};
use chrono::Utc;
use listener::ListenerOrchestrator;
use scheduler::SchedulerOrchestrator;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use uuid::Uuid;
pub struct Orchestrator {
scheduler: Arc<Mutex<SchedulerOrchestrator>>,
queue_service: Arc<QueueService>,
listener_service: Arc<ListenerOrchestrator>,
task_services: Arc<RwLock<HashMap<String, Box<dyn TaskService>>>>,
workflows: Arc<RwLock<HashMap<Uuid, Workflow>>>,
active_sessions: Arc<RwLock<HashMap<Uuid, OrchestrationContext>>>,
content_processors: Arc<RwLock<HashMap<String, Box<dyn ContentProcessor>>>>,
}
impl Orchestrator {
pub fn new() -> Self {
Self {
scheduler: Arc::new(Mutex::new(SchedulerOrchestrator::new())),
queue_service: Arc::new(QueueService::new()),
listener_service: Arc::new(ListenerOrchestrator::new()),
task_services: Arc::new(RwLock::new(HashMap::new())),
workflows: Arc::new(RwLock::new(HashMap::new())),
active_sessions: Arc::new(RwLock::new(HashMap::new())),
content_processors: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn start(&self) -> Result<(), OrchestratorError> {
self.initialize_default_services().await?;
let scheduler_clone = Arc::clone(&self.scheduler);
tokio::spawn(async move {
let mut scheduler = scheduler_clone.lock().await;
scheduler.start().await;
});
println!("Orchestrator started successfully");
Ok(())
}
pub async fn register_task_service(
&self,
service: Box<dyn TaskService>,
) -> Result<(), OrchestratorError> {
let service_name = service.name().to_string();
{
let mut scheduler = self.scheduler.lock().await;
scheduler.register_service(service.clone_service());
}
{
let mut services = self.task_services.write().await;
services.insert(service_name.clone(), service);
}
println!("Registered task service: {}", service_name);
Ok(())
}
pub async fn register_event_listener(
&self,
listener: Box<dyn EventListener>,
) -> Result<(), OrchestratorError> {
self.listener_service
.register_listener(listener)
.await
.map_err(OrchestratorError::ListenerError)?;
Ok(())
}
pub async fn register_content_processor(
&self,
processor: Box<dyn ContentProcessor>,
) -> Result<(), OrchestratorError> {
let processor_name = processor.name().to_string();
let mut processors = self.content_processors.write().await;
processors.insert(processor_name.clone(), processor);
println!("Registered content processor: {}", processor_name);
Ok(())
}
pub async fn execute_job(
&self,
job: Job,
context: OrchestrationContext,
) -> Result<Uuid, OrchestratorError> {
let job_id = job.id();
self.start_session(context.clone()).await?;
let services: HashMap<String, Box<dyn TaskService>> = {
let services_guard = self.task_services.read().await;
services_guard
.iter()
.map(|(k, v)| (k.clone(), v.clone_service()))
.collect()
};
let mut job_clone = job.clone();
match job_clone.execute(&services).await {
Ok(_) => {
println!("Job '{}' executed successfully", job_clone.name());
self.end_session(context.session_id).await?;
Ok(job_id)
}
Err(e) => {
println!("Job '{}' execution failed: {}", job_clone.name(), e);
self.end_session(context.session_id).await?;
Err(OrchestratorError::JobError(e))
}
}
}
pub async fn schedule_job(
&self,
job: Job,
schedule: Schedule,
context: OrchestrationContext,
) -> Result<Uuid, OrchestratorError> {
let job_id = job.id();
self.start_session(context).await?;
{
let mut scheduler = self.scheduler.lock().await;
scheduler
.add_job(job, schedule)
.map_err(OrchestratorError::SchedulerError)?;
}
println!("Job scheduled with ID: {}", job_id);
Ok(job_id)
}
pub async fn queue_job(
&self,
job: Job,
queue_name: &str,
context: OrchestrationContext,
) -> Result<Uuid, OrchestratorError> {
let message = Message::with_priority(
Location::system("orchestrator"),
Location::service("job_processor"),
job.clone(),
match job.action.priority {
ActionPriority::Low => MessagePriority::Low,
ActionPriority::Normal => MessagePriority::Normal,
ActionPriority::High => MessagePriority::High,
ActionPriority::Critical => MessagePriority::Critical,
},
);
let queue_item = QueueItem::new(queue_name.to_string(), message, None);
let item_id = self
.queue_service
.enqueue(queue_name, queue_item)
.await
.map_err(OrchestratorError::QueueError)?;
self.start_session(context).await?;
println!("Job queued with item ID: {}", item_id);
Ok(item_id)
}
pub async fn process_content(
&self,
content: ContentItem,
processor_name: &str,
context: OrchestrationContext,
) -> Result<ContentProcessingResult, OrchestratorError> {
let processor = {
let processors = self.content_processors.read().await;
processors
.get(processor_name)
.ok_or_else(|| OrchestratorError::ProcessorNotFound(processor_name.to_string()))?
.clone_box()?
};
self.start_session(context.clone()).await?;
let result = processor.process_content(content, context.clone()).await?;
self.end_session(context.session_id).await?;
Ok(result)
}
pub async fn create_workflow(&self, mut workflow: Workflow) -> Result<Uuid, OrchestratorError> {
let workflow_id = workflow.id;
for workflow_queue in &workflow.queues {
match self
.queue_service
.create_queue(
workflow_queue.name.clone(),
Some(workflow_queue.config.clone()),
)
.await
{
Ok(_) => println!("Created queue: {}", workflow_queue.name),
Err(e) => {
println!(
"Warning: Failed to create queue {}: {:?}",
workflow_queue.name, e
);
}
}
}
for workflow_listener in &workflow.listeners {
let listener = self
.create_workflow_listener(workflow_listener.clone(), workflow_id)
.await?;
if let Err(e) = self.listener_service.register_listener(listener).await {
println!(
"Warning: Failed to register listener {}: {:?}",
workflow_listener.name, e
);
}
}
match &workflow.execution_order {
WorkflowExecutionOrder::Sequential => {
println!(
"Sequential execution order configured for workflow {}",
workflow_id
);
}
WorkflowExecutionOrder::Parallel => {
println!(
"Parallel execution order configured for workflow {}",
workflow_id
);
}
WorkflowExecutionOrder::EventDriven => {
println!(
"Event-driven execution order configured for workflow {}",
workflow_id
);
}
WorkflowExecutionOrder::Custom(stages) => {
println!(
"Custom execution order with {} stages configured for workflow {}",
stages.len(),
workflow_id
);
}
}
workflow.updated_at = Utc::now();
let mut workflows = self.workflows.write().await;
workflows.insert(workflow_id, workflow);
println!(
"Workflow '{}' created with ID: {}",
workflows.get(&workflow_id).unwrap().name,
workflow_id
);
Ok(workflow_id)
}
pub async fn process_event(&self, event: Event) -> Result<Vec<Uuid>, OrchestratorError> {
let trigger_ids = self
.listener_service
.process_event(event)
.await
.map_err(OrchestratorError::ListenerError)?;
for trigger_id in &trigger_ids {
if let Some(trigger) = self.listener_service.get_next_trigger().await {
println!(
"Processing trigger {} for trigger ID {}",
trigger.name, trigger_id
);
self.execute_trigger(trigger).await?;
}
}
Ok(trigger_ids)
}
async fn execute_trigger(&self, trigger: Trigger) -> Result<(), OrchestratorError> {
let workflows = self.workflows.read().await;
let mut target_job: Option<Job> = None;
for workflow in workflows.values() {
for listener in &workflow.listeners {
if listener.name == trigger.source
&& let Some(target_job_id) = listener.target_job_id
{
if let Some(job) = workflow.jobs.iter().find(|j| j.id() == target_job_id) {
target_job = Some(job.clone());
break;
}
}
}
if target_job.is_some() {
break;
}
}
let job = if let Some(job) = target_job {
job
} else {
let task = Task::new(
trigger.action.name.clone(),
trigger.action.description.clone(),
trigger.target.clone(),
);
Job::new(
format!("Triggered Job: {}", trigger.name),
format!("Job created from trigger: {}", trigger.description),
vec![task],
)
};
let context = OrchestrationContext::new(
trigger.source.clone(),
"production".to_string(), );
self.execute_job(job, context).await?;
Ok(())
}
pub async fn create_content_analysis_workflow(
&self,
content_items: Vec<ContentItem>,
analysis_type: ContentAnalysisType,
context: OrchestrationContext,
) -> Result<Uuid, OrchestratorError> {
if content_items.is_empty() {
return Err(OrchestratorError::ConfigurationError(
"No content items provided".to_string(),
));
}
let workflow_id = Uuid::new_v4();
let mut jobs = Vec::new();
for (i, content_item) in content_items.iter().enumerate() {
let content_task = self
.create_content_analysis_task(content_item.clone(), analysis_type.clone(), i)
.await?;
let job = Job::new(
format!("Content Analysis Job {}", i + 1),
format!("Analyze content item: {}", content_item.uuid()),
vec![content_task],
)
.with_priority(ActionPriority::Normal);
jobs.push(job);
}
let workflow = Workflow {
id: workflow_id,
name: format!("Content Analysis Workflow - {}", analysis_type.name()),
description: format!(
"Analyze {} content items using {}",
content_items.len(),
analysis_type.name()
),
jobs,
listeners: vec![],
queues: vec![], execution_order: WorkflowExecutionOrder::Parallel,
context: context.clone(),
created_at: Utc::now(),
updated_at: Utc::now(),
};
let mut workflows = self.workflows.write().await;
workflows.insert(workflow_id, workflow);
println!("Content analysis workflow created with ID: {}", workflow_id);
Ok(workflow_id)
}
async fn start_session(&self, context: OrchestrationContext) -> Result<(), OrchestratorError> {
let mut sessions = self.active_sessions.write().await;
sessions.insert(context.session_id, context.clone());
println!("Started orchestration session: {}", context.session_id);
Ok(())
}
async fn end_session(&self, session_id: Uuid) -> Result<(), OrchestratorError> {
let mut sessions = self.active_sessions.write().await;
if let Some(context) = sessions.remove(&session_id) {
let duration = Utc::now().timestamp() - context.started_at.timestamp();
println!(
"Ended orchestration session: {} (duration: {}s)",
session_id, duration
);
}
Ok(())
}
pub async fn get_stats(&self) -> OrchestratorStats {
let scheduler_stats = {
let scheduler = self.scheduler.lock().await;
scheduler.stats()
};
let queue_stats = self.queue_service.get_all_stats().await;
let listener_stats = self.listener_service.get_all_stats().await;
let workflows_count = {
let workflows = self.workflows.read().await;
workflows.len()
};
let active_sessions_count = {
let sessions = self.active_sessions.read().await;
sessions.len()
};
let services_count = {
let services = self.task_services.read().await;
services.len()
};
let processors_count = {
let processors = self.content_processors.read().await;
processors.len()
};
OrchestratorStats {
active_sessions: active_sessions_count,
total_workflows: workflows_count,
total_services: services_count,
total_processors: processors_count,
scheduler_stats,
queue_stats,
listener_stats,
}
}
async fn initialize_default_services(&self) -> Result<(), OrchestratorError> {
self.register_task_service(Box::new(
crate::core::platform::container::task::DataBackupService {
backup_path: "/var/backups".to_string(),
},
))
.await?;
self.register_task_service(Box::new(
crate::core::platform::container::task::ContentIndexingService {
index_name: "main_index".to_string(),
},
))
.await?;
self.register_task_service(Box::new(
crate::core::platform::container::task::EmailNotificationService {
smtp_server: "localhost:587".to_string(),
},
))
.await?;
self.register_content_processor(Box::new(DefaultContentProcessor))
.await?;
Ok(())
}
async fn create_workflow_listener(
&self,
workflow_listener: WorkflowListener,
workflow_id: Uuid,
) -> Result<Box<dyn EventListener>, OrchestratorError> {
Ok(Box::new(WorkflowEventListener {
name: workflow_listener.name,
conditions: workflow_listener.conditions,
target_job_id: workflow_listener.target_job_id,
target_queue: workflow_listener.target_queue,
workflow_id,
config: ListenerConfig::default(),
}))
}
async fn create_content_analysis_task(
&self,
content_item: ContentItem,
analysis_type: ContentAnalysisType,
index: usize,
) -> Result<Task, OrchestratorError> {
let mut task = Task::new(
format!("Content Analysis Task {}", index + 1),
format!("Analyze content using {}", analysis_type.name()),
"ContentAnalysisService".to_string(),
);
task.action
.add_argument("content_item".to_string(), content_item)
.map_err(|e| OrchestratorError::TaskError(TaskError::ActionError(e)))?;
task.action
.add_argument("analysis_type".to_string(), analysis_type)
.map_err(|e| OrchestratorError::TaskError(TaskError::ActionError(e)))?;
Ok(task)
}
}
impl Default for Orchestrator {
fn default() -> Self {
Self::new()
}
}
struct WorkflowEventListener {
name: String,
conditions: Vec<TriggerCondition>,
target_job_id: Option<Uuid>,
target_queue: Option<String>,
workflow_id: Uuid,
config: ListenerConfig,
}
use async_trait::async_trait;
#[async_trait]
impl EventListener for WorkflowEventListener {
fn name(&self) -> &str {
&self.name
}
fn description(&self) -> &str {
"Workflow event listener"
}
fn conditions(&self) -> &[TriggerCondition] {
&self.conditions
}
async fn should_process(&self, event: &Event) -> bool {
self.conditions.iter().any(|condition| {
condition.event_type_pattern == "*"
|| condition.event_type_pattern == event.event_type
|| event
.event_type
.contains(&condition.event_type_pattern.replace('*', ""))
})
}
async fn create_trigger(&self, event: Event) -> Result<Trigger, ListenerError> {
let mut action = Action::new(
format!("Workflow Action: {}", self.name),
format!(
"Action triggered by workflow listener for event: {}",
event.event_type
),
self.name.clone(),
self.target_queue
.clone()
.unwrap_or_else(|| "default_queue".to_string()),
);
if let Some(job_id) = self.target_job_id {
action
.add_argument("target_job_id".to_string(), job_id)
.map_err(|e| ListenerError::TriggerCreationFailed(e.to_string()))?;
}
let condition = self
.conditions
.first()
.ok_or_else(|| {
ListenerError::InvalidConfiguration("No conditions defined".to_string())
})?
.clone();
Ok(Trigger::new(
format!("Workflow Trigger: {}", self.name),
format!(
"Trigger for workflow {} from event {}",
self.workflow_id, event.event_type
),
self.name.clone(),
self.target_queue
.clone()
.unwrap_or_else(|| "default_service".to_string()),
event,
action,
condition,
))
}
fn config(&self) -> &ListenerConfig {
&self.config
}
fn update_config(&mut self, config: ListenerConfig) {
self.config = config;
}
async fn health_check(&self) -> Result<bool, ListenerError> {
Ok(true)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::application::use_cases::queue_orchestrator::QueueError;
use crate::core::platform::container::content::{ContentType, TextContent};
#[tokio::test]
async fn test_orchestrator_creation() {
let orchestrator = Orchestrator::new();
let stats = orchestrator.get_stats().await;
assert_eq!(stats.active_sessions, 0);
assert_eq!(stats.total_workflows, 0);
}
#[tokio::test]
async fn test_orchestrator_default() {
let orchestrator = Orchestrator::default();
let stats = orchestrator.get_stats().await;
assert_eq!(stats.active_sessions, 0);
}
#[tokio::test]
async fn test_orchestration_context_creation() {
let context = OrchestrationContext::new("test_user".to_string(), "dev".to_string());
assert_eq!(context.initiator, "test_user");
assert_eq!(context.environment, "dev");
assert!(context.correlation_id.is_none());
assert!(context.metadata.is_empty());
assert!(context.started_at <= Utc::now());
}
#[tokio::test]
async fn test_orchestration_context_with_correlation() {
let correlation_id = Uuid::new_v4();
let context = OrchestrationContext::new("test_user".to_string(), "staging".to_string())
.with_correlation(correlation_id);
assert_eq!(context.correlation_id, Some(correlation_id));
}
#[tokio::test]
async fn test_orchestration_context_add_metadata() {
let mut context = OrchestrationContext::new("test_user".to_string(), "prod".to_string());
let result = context.add_metadata("key1".to_string(), "value1");
assert!(result.is_ok());
assert_eq!(context.metadata.len(), 1);
assert!(context.metadata.contains_key("key1"));
}
#[tokio::test]
async fn test_orchestration_context_add_complex_metadata() {
let mut context = OrchestrationContext::new("test_user".to_string(), "test".to_string());
let complex_data = serde_json::json!({
"nested": {
"field": 123,
"array": [1, 2, 3]
}
});
let result = context.add_metadata("complex".to_string(), complex_data);
assert!(result.is_ok());
assert!(context.metadata.contains_key("complex"));
}
#[tokio::test]
async fn test_error_variants_display() {
let scheduler_err =
OrchestratorError::SchedulerError(SchedulerError::JobNotFound(Uuid::new_v4()));
assert!(scheduler_err.to_string().contains("Scheduler error"));
let queue_err =
OrchestratorError::QueueError(QueueError::QueueNotFound("test_queue".to_string()));
assert!(queue_err.to_string().contains("Queue error"));
let listener_err = OrchestratorError::ListenerError(ListenerError::ListenerNotFound(
"test_listener".to_string(),
));
assert!(listener_err.to_string().contains("Listener error"));
let processor_err = OrchestratorError::ProcessorNotFound("test_processor".to_string());
assert_eq!(
processor_err.to_string(),
"Processor not found: test_processor"
);
let workflow_err = OrchestratorError::WorkflowNotFound(Uuid::new_v4());
assert!(workflow_err.to_string().contains("Workflow not found"));
let session_err = OrchestratorError::SessionNotFound(Uuid::new_v4());
assert!(session_err.to_string().contains("Session not found"));
let serialization_err = OrchestratorError::SerializationError("invalid json".to_string());
assert_eq!(
serialization_err.to_string(),
"Serialization error: invalid json"
);
let config_err = OrchestratorError::ConfigurationError("bad config".to_string());
assert_eq!(config_err.to_string(), "Configuration error: bad config");
let service_err = OrchestratorError::ServiceError("service down".to_string());
assert_eq!(service_err.to_string(), "Service error: service down");
}
#[tokio::test]
async fn test_orchestrator_service_registration() {
let orchestrator = Orchestrator::new();
let service = crate::core::platform::container::task::DataBackupService {
backup_path: "/test/backup".to_string(),
};
let result = orchestrator.register_task_service(Box::new(service)).await;
assert!(result.is_ok());
let stats = orchestrator.get_stats().await;
assert_eq!(stats.total_services, 1);
}
#[tokio::test]
async fn test_start_and_end_session() {
let orchestrator = Orchestrator::new();
let context = OrchestrationContext::new("test_user".to_string(), "test".to_string());
let session_id = context.session_id;
let start_result = orchestrator.start_session(context).await;
assert!(start_result.is_ok());
let stats = orchestrator.get_stats().await;
assert_eq!(stats.active_sessions, 1);
let end_result = orchestrator.end_session(session_id).await;
assert!(end_result.is_ok());
let stats = orchestrator.get_stats().await;
assert_eq!(stats.active_sessions, 0);
}
#[tokio::test]
async fn test_end_nonexistent_session() {
let orchestrator = Orchestrator::new();
let fake_session_id = Uuid::new_v4();
let result = orchestrator.end_session(fake_session_id).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_content_processing() {
let orchestrator = Orchestrator::new();
orchestrator.start().await.unwrap();
let text_content = TextContent::new(None, Some("Test content".to_string())).unwrap();
let content_item = ContentItem::new(ContentType::Text(text_content)).unwrap();
let context = OrchestrationContext::new("test_user".to_string(), "test".to_string());
let result = orchestrator
.process_content(content_item, "DefaultContentProcessor", context)
.await;
assert!(result.is_ok());
let processing_result = result.unwrap();
assert!(processing_result.success);
assert!(processing_result.result_data.is_some());
assert!(processing_result.error.is_none());
assert_eq!(processing_result.processor_name, "DefaultContentProcessor");
}
#[tokio::test]
async fn test_content_processing_with_nonexistent_processor() {
let orchestrator = Orchestrator::new();
orchestrator.start().await.unwrap();
let text_content = TextContent::new(None, Some("Test".to_string())).unwrap();
let content_item = ContentItem::new(ContentType::Text(text_content)).unwrap();
let context = OrchestrationContext::new("test_user".to_string(), "test".to_string());
let result = orchestrator
.process_content(content_item, "NonExistentProcessor", context)
.await;
assert!(result.is_err());
match result {
Err(OrchestratorError::ProcessorNotFound(name)) => {
assert_eq!(name, "NonExistentProcessor");
}
_ => panic!("Expected ProcessorNotFound error"),
}
}
#[tokio::test]
async fn test_job_execution() {
let orchestrator = Orchestrator::new();
orchestrator.start().await.unwrap();
let task = Task::new(
"Test Task".to_string(),
"A test task".to_string(),
"DataBackupService".to_string(),
);
let job = Job::new("Test Job".to_string(), "A test job".to_string(), vec![task]);
let context = OrchestrationContext::new("test_user".to_string(), "test".to_string());
let result = orchestrator.execute_job(job, context).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_schedule_job() {
let orchestrator = Orchestrator::new();
orchestrator.start().await.unwrap();
let task = Task::new(
"Scheduled Task".to_string(),
"A scheduled task".to_string(),
"DataBackupService".to_string(),
);
let job = Job::new(
"Scheduled Job".to_string(),
"A scheduled job".to_string(),
vec![task],
);
let schedule = Schedule::Interval(std::time::Duration::from_secs(3600));
let context = OrchestrationContext::new("test_user".to_string(), "test".to_string());
let result = orchestrator.schedule_job(job, schedule, context).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_queue_job() {
let orchestrator = Orchestrator::new();
orchestrator.start().await.unwrap();
let task = Task::new(
"Queued Task".to_string(),
"A queued task".to_string(),
"DataBackupService".to_string(),
);
let job = Job::new(
"Queued Job".to_string(),
"A queued job".to_string(),
vec![task],
);
let context = OrchestrationContext::new("test_user".to_string(), "test".to_string());
let _ = orchestrator
.queue_service
.create_queue("test_queue".to_string(), None)
.await;
let result = orchestrator.queue_job(job, "test_queue", context).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_content_analysis_workflow() {
let orchestrator = Arc::new(Orchestrator::new());
orchestrator.start().await.unwrap();
let text_content = TextContent::new(
None, Some("This is test content for analysis".to_string()),
)
.unwrap();
let content_item = ContentItem::new(ContentType::Text(text_content)).unwrap();
let context = OrchestrationContext::new("test_service".to_string(), "test".to_string());
let result = orchestrator
.create_content_analysis_workflow(
vec![content_item],
ContentAnalysisType::LanguageDetection,
context,
)
.await;
match result {
Ok(workflow_id) => {
assert!(workflow_id != Uuid::nil());
let workflows = orchestrator.workflows.read().await;
assert!(workflows.contains_key(&workflow_id));
let workflow = workflows.get(&workflow_id).unwrap();
assert!(!workflow.jobs.is_empty());
assert!(workflow.name.contains("Content Analysis Workflow"));
}
Err(e) => {
panic!("Workflow creation failed with error: {:?}", e);
}
}
}
#[tokio::test]
async fn test_event_processing() {
let orchestrator = Orchestrator::new();
orchestrator.start().await.unwrap();
let event = Event::new(
"test_event".to_string(),
serde_json::json!({"test": "data"}),
"test_source".to_string(),
);
let result = orchestrator.process_event(event).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_multiple_sessions() {
let orchestrator = Orchestrator::new();
let context1 = OrchestrationContext::new("user1".to_string(), "dev".to_string());
let context2 = OrchestrationContext::new("user2".to_string(), "prod".to_string());
let context3 = OrchestrationContext::new("user3".to_string(), "staging".to_string());
orchestrator.start_session(context1.clone()).await.unwrap();
orchestrator.start_session(context2.clone()).await.unwrap();
orchestrator.start_session(context3.clone()).await.unwrap();
let stats = orchestrator.get_stats().await;
assert_eq!(stats.active_sessions, 3);
orchestrator.end_session(context1.session_id).await.unwrap();
let stats = orchestrator.get_stats().await;
assert_eq!(stats.active_sessions, 2);
}
#[tokio::test]
async fn test_workflow_creation() {
let orchestrator = Orchestrator::new();
orchestrator.start().await.unwrap();
let context = OrchestrationContext::new("test_user".to_string(), "test".to_string());
let workflow = Workflow {
id: Uuid::new_v4(),
name: "Test Workflow".to_string(),
description: "A test workflow".to_string(),
jobs: Vec::new(),
listeners: Vec::new(),
queues: Vec::new(),
execution_order: WorkflowExecutionOrder::Sequential,
context,
created_at: Utc::now(),
updated_at: Utc::now(),
};
let result = orchestrator.create_workflow(workflow).await;
assert!(result.is_ok());
let stats = orchestrator.get_stats().await;
assert_eq!(stats.total_workflows, 1);
}
#[tokio::test]
async fn test_default_content_processor() {
let processor = DefaultContentProcessor;
assert_eq!(processor.name(), "DefaultContentProcessor");
let text_content = TextContent::new(None, Some("Test content".to_string())).unwrap();
let content_item = ContentItem::new(ContentType::Text(text_content)).unwrap();
let context = OrchestrationContext::new("test_user".to_string(), "test".to_string());
let result = processor.process_content(content_item, context).await;
assert!(result.is_ok());
let processing_result = result.unwrap();
assert!(processing_result.success);
assert_eq!(processing_result.processor_name, "DefaultContentProcessor");
}
#[tokio::test]
async fn test_content_processor_clone() {
let processor = DefaultContentProcessor;
let cloned = processor.clone_box();
assert!(cloned.is_ok());
let cloned_processor = cloned.unwrap();
assert_eq!(cloned_processor.name(), "DefaultContentProcessor");
}
#[tokio::test]
async fn test_orchestrator_stats_structure() {
let orchestrator = Orchestrator::new();
orchestrator.start().await.unwrap();
let stats = orchestrator.get_stats().await;
assert_eq!(stats.active_sessions, 0);
assert_eq!(stats.total_workflows, 0);
assert_eq!(stats.total_services, 3); assert_eq!(stats.total_processors, 1); assert!(stats.queue_stats.is_empty());
assert!(stats.listener_stats.is_empty());
}
#[tokio::test]
async fn test_content_processing_result_structure() {
let result = ContentProcessingResult {
content_id: Uuid::new_v4(),
processor_name: "TestProcessor".to_string(),
processing_time_ms: 150,
success: true,
result_data: Some(serde_json::json!({"test": "data"})),
error: None,
metadata: HashMap::new(),
};
assert_eq!(result.processor_name, "TestProcessor");
assert_eq!(result.processing_time_ms, 150);
assert!(result.success);
assert!(result.result_data.is_some());
assert!(result.error.is_none());
assert!(result.metadata.is_empty());
}
}