taskflow_rs/executor/
mod.rs1pub mod config;
2pub mod handlers;
3pub mod task_executor;
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::time::Duration;
8use tracing::{error, info, warn};
9
10use crate::Task;
11
12use crate::error::Result;
13use crate::scheduler::Scheduler;
14use crate::task::TaskHandler;
15pub use config::ExecutorConfig;
16pub use handlers::{HttpTaskHandler, ShellTaskHandler};
17pub use task_executor::TaskExecutor;
18
19#[derive(Clone)]
20pub struct Executor {
21 config: ExecutorConfig,
22 scheduler: Arc<Scheduler>,
23 handlers: HashMap<String, Arc<dyn TaskHandler>>,
24 executor: TaskExecutor,
25}
26
27impl Executor {
28 pub fn new(scheduler: Arc<Scheduler>, config: ExecutorConfig) -> Self {
29 Self {
30 config,
31 scheduler,
32 handlers: HashMap::new(),
33 executor: TaskExecutor::new(),
34 }
35 }
36
37 pub fn register_handler(&mut self, handler: Arc<dyn TaskHandler>) {
38 let task_type = handler.task_type().to_string();
39 self.handlers.insert(task_type.clone(), handler);
40 info!("Registered task handler for type: {}", task_type);
41 }
42
43 pub async fn start(&self) -> Result<()> {
44 info!("Starting executor: {}", self.config.worker_id);
45
46 let shutdown_signal = self.executor.wait_for_shutdown();
47
48 let executor_arc = Arc::new(self.clone());
49
50 let heartbeat_task = {
51 let executor = Arc::clone(&executor_arc);
52 tokio::spawn(async move {
53 executor.start_heartbeat_loop().await;
54 })
55 };
56
57 let execution_task = {
58 let executor = Arc::clone(&executor_arc);
59 tokio::spawn(async move {
60 executor.start_execution_loop().await;
61 })
62 };
63
64 tokio::select! {
65 _ = shutdown_signal.notified() => {
66 info!("Shutdown signal received");
67 }
68 result = heartbeat_task => {
69 if let Err(e) = result {
70 error!("Heartbeat task failed: {}", e);
71 }
72 }
73 result = execution_task => {
74 if let Err(e) = result {
75 error!("Execution task failed: {}", e);
76 }
77 }
78 }
79
80 self.executor.wait_for_running_tasks().await;
81 info!("Executor stopped: {}", self.config.worker_id);
82 Ok(())
83 }
84
85 pub async fn shutdown(&self) {
86 info!("Initiating executor shutdown: {}", self.config.worker_id);
87 self.executor.shutdown();
88 }
89
90 async fn start_execution_loop(&self) {
91 let mut interval = tokio::time::interval(Duration::from_secs(1));
92
93 loop {
94 tokio::select! {
95 _ = self.executor.wait_for_shutdown().notified() => {
96 break;
97 }
98 _ = interval.tick() => {
99 if let Err(e) = self.process_tasks().await {
100 error!("Error processing tasks: {}", e);
101 }
102 }
103 }
104 }
105 }
106
107 async fn start_heartbeat_loop(&self) {
108 let mut interval =
109 tokio::time::interval(Duration::from_secs(self.config.heartbeat_interval_seconds));
110
111 loop {
112 tokio::select! {
113 _ = self.executor.wait_for_shutdown().notified() => {
114 break;
115 }
116 _ = interval.tick() => {
117 self.send_heartbeat().await;
118 }
119 }
120 }
121 }
122
123 async fn process_tasks(&self) -> Result<()> {
124 let running_count = self.executor.running_count().await;
125 if running_count >= self.config.max_concurrent_tasks {
126 return Ok(());
127 }
128
129 let available_slots = self.config.max_concurrent_tasks - running_count;
130
131 for _ in 0..available_slots {
132 if let Some(task) = self.scheduler.get_next_task().await {
133 if self.handlers.contains_key(&task.definition.task_type) {
134 self.execute_task(task).await;
135 } else {
136 warn!(
137 "No handler registered for task type: {}",
138 task.definition.task_type
139 );
140 self.scheduler
141 .complete_task(
142 &task.definition.id,
143 false,
144 None,
145 Some(format!(
146 "No handler for task type: {}",
147 task.definition.task_type
148 )),
149 )
150 .await?;
151 }
152 } else {
153 break;
154 }
155 }
156
157 Ok(())
158 }
159
160 async fn execute_task(&self, mut task: Task) {
161 let task_id = task.definition.id.clone();
162 let task_type = task.definition.task_type.clone();
163
164 self.executor.add_running_task(task_id.clone()).await;
165
166 task.start_execution(&self.config.worker_id);
167
168 let handler = self.handlers.get(&task_type).unwrap().clone();
169 let scheduler = Arc::clone(&self.scheduler);
170 let executor = self.executor.clone();
171 let timeout_duration = Duration::from_secs(
172 task.definition
173 .timeout_seconds
174 .max(self.config.task_timeout_seconds),
175 );
176
177 tokio::spawn(async move {
178 let result = tokio::time::timeout(timeout_duration, handler.execute(&task)).await;
179
180 let (success, output, error) = match result {
181 Ok(Ok(task_result)) => (task_result.success, task_result.output, task_result.error),
182 Ok(Err(e)) => (false, None, Some(e.to_string())),
183 Err(_) => (false, None, Some("Task execution timeout".to_string())),
184 };
185
186 if let Err(e) = scheduler
187 .complete_task(&task_id, success, output, error)
188 .await
189 {
190 error!("Failed to complete task {}: {}", task_id, e);
191 }
192
193 executor.remove_running_task(&task_id).await;
194
195 info!(
196 "Task execution finished: {} (success: {})",
197 task_id, success
198 );
199 });
200 }
201
202 async fn send_heartbeat(&self) {
203 let running_count = self.executor.running_count().await;
204 info!(
205 "Executor heartbeat: {} (running tasks: {})",
206 self.config.worker_id, running_count
207 );
208 }
209}