taskflow_rs/executor/
mod.rs

1pub mod config;
2pub mod handlers;
3pub mod task_executor;
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::time::Duration;
8use tracing::{error, info, warn};
9
10use crate::Task;
11
12use crate::error::Result;
13use crate::scheduler::Scheduler;
14use crate::task::TaskHandler;
15pub use config::ExecutorConfig;
16pub use handlers::{HttpTaskHandler, ShellTaskHandler};
17pub use task_executor::TaskExecutor;
18
19#[derive(Clone)]
20pub struct Executor {
21    config: ExecutorConfig,
22    scheduler: Arc<Scheduler>,
23    handlers: HashMap<String, Arc<dyn TaskHandler>>,
24    executor: TaskExecutor,
25}
26
27impl Executor {
28    pub fn new(scheduler: Arc<Scheduler>, config: ExecutorConfig) -> Self {
29        Self {
30            config,
31            scheduler,
32            handlers: HashMap::new(),
33            executor: TaskExecutor::new(),
34        }
35    }
36
37    pub fn register_handler(&mut self, handler: Arc<dyn TaskHandler>) {
38        let task_type = handler.task_type().to_string();
39        self.handlers.insert(task_type.clone(), handler);
40        info!("Registered task handler for type: {}", task_type);
41    }
42
43    pub async fn start(&self) -> Result<()> {
44        info!("Starting executor: {}", self.config.worker_id);
45
46        let shutdown_signal = self.executor.wait_for_shutdown();
47
48        let executor_arc = Arc::new(self.clone());
49
50        let heartbeat_task = {
51            let executor = Arc::clone(&executor_arc);
52            tokio::spawn(async move {
53                executor.start_heartbeat_loop().await;
54            })
55        };
56
57        let execution_task = {
58            let executor = Arc::clone(&executor_arc);
59            tokio::spawn(async move {
60                executor.start_execution_loop().await;
61            })
62        };
63
64        tokio::select! {
65            _ = shutdown_signal.notified() => {
66                info!("Shutdown signal received");
67            }
68            result = heartbeat_task => {
69                if let Err(e) = result {
70                    error!("Heartbeat task failed: {}", e);
71                }
72            }
73            result = execution_task => {
74                if let Err(e) = result {
75                    error!("Execution task failed: {}", e);
76                }
77            }
78        }
79
80        self.executor.wait_for_running_tasks().await;
81        info!("Executor stopped: {}", self.config.worker_id);
82        Ok(())
83    }
84
85    pub async fn shutdown(&self) {
86        info!("Initiating executor shutdown: {}", self.config.worker_id);
87        self.executor.shutdown();
88    }
89
90    async fn start_execution_loop(&self) {
91        let mut interval = tokio::time::interval(Duration::from_secs(1));
92
93        loop {
94            tokio::select! {
95                _ = self.executor.wait_for_shutdown().notified() => {
96                    break;
97                }
98                _ = interval.tick() => {
99                    if let Err(e) = self.process_tasks().await {
100                        error!("Error processing tasks: {}", e);
101                    }
102                }
103            }
104        }
105    }
106
107    async fn start_heartbeat_loop(&self) {
108        let mut interval =
109            tokio::time::interval(Duration::from_secs(self.config.heartbeat_interval_seconds));
110
111        loop {
112            tokio::select! {
113                _ = self.executor.wait_for_shutdown().notified() => {
114                    break;
115                }
116                _ = interval.tick() => {
117                    self.send_heartbeat().await;
118                }
119            }
120        }
121    }
122
123    async fn process_tasks(&self) -> Result<()> {
124        let running_count = self.executor.running_count().await;
125        if running_count >= self.config.max_concurrent_tasks {
126            return Ok(());
127        }
128
129        let available_slots = self.config.max_concurrent_tasks - running_count;
130
131        for _ in 0..available_slots {
132            if let Some(task) = self.scheduler.get_next_task().await {
133                if self.handlers.contains_key(&task.definition.task_type) {
134                    self.execute_task(task).await;
135                } else {
136                    warn!(
137                        "No handler registered for task type: {}",
138                        task.definition.task_type
139                    );
140                    self.scheduler
141                        .complete_task(
142                            &task.definition.id,
143                            false,
144                            None,
145                            Some(format!(
146                                "No handler for task type: {}",
147                                task.definition.task_type
148                            )),
149                        )
150                        .await?;
151                }
152            } else {
153                break;
154            }
155        }
156
157        Ok(())
158    }
159
160    async fn execute_task(&self, mut task: Task) {
161        let task_id = task.definition.id.clone();
162        let task_type = task.definition.task_type.clone();
163
164        self.executor.add_running_task(task_id.clone()).await;
165
166        task.start_execution(&self.config.worker_id);
167
168        let handler = self.handlers.get(&task_type).unwrap().clone();
169        let scheduler = Arc::clone(&self.scheduler);
170        let executor = self.executor.clone();
171        let timeout_duration = Duration::from_secs(
172            task.definition
173                .timeout_seconds
174                .max(self.config.task_timeout_seconds),
175        );
176
177        tokio::spawn(async move {
178            let result = tokio::time::timeout(timeout_duration, handler.execute(&task)).await;
179
180            let (success, output, error) = match result {
181                Ok(Ok(task_result)) => (task_result.success, task_result.output, task_result.error),
182                Ok(Err(e)) => (false, None, Some(e.to_string())),
183                Err(_) => (false, None, Some("Task execution timeout".to_string())),
184            };
185
186            if let Err(e) = scheduler
187                .complete_task(&task_id, success, output, error)
188                .await
189            {
190                error!("Failed to complete task {}: {}", task_id, e);
191            }
192
193            executor.remove_running_task(&task_id).await;
194
195            info!(
196                "Task execution finished: {} (success: {})",
197                task_id, success
198            );
199        });
200    }
201
202    async fn send_heartbeat(&self) {
203        let running_count = self.executor.running_count().await;
204        info!(
205            "Executor heartbeat: {} (running tasks: {})",
206            self.config.worker_id, running_count
207        );
208    }
209}