1use std::net::SocketAddr;
21use std::sync::atomic::Ordering;
22use std::sync::Arc;
23use std::time::{Duration, Instant, UNIX_EPOCH};
24use std::{env, io};
25
26use anyhow::{Context, Result};
27use arrow_flight::flight_service_server::FlightServiceServer;
28use futures::stream::FuturesUnordered;
29use futures::StreamExt;
30use kapot_core::object_store_registry::KapotObjectStoreRegistry;
31use log::{error, info, warn};
32use tempfile::TempDir;
33use tokio::fs::DirEntry;
34use tokio::signal;
35use tokio::sync::mpsc;
36use tokio::task::JoinHandle;
37use tokio::{fs, time};
38use tracing_subscriber::EnvFilter;
39use uuid::Uuid;
40
41use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
42use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
43
44#[cfg(not(windows))]
45use kapot_core::cache_layer::{
46 medium::local_disk::LocalDiskMedium, policy::file::FileCacheLayer, CacheLayer,
47};
48use kapot_core::config::{DataCachePolicy, LogRotationPolicy, TaskSchedulingPolicy};
49use kapot_core::error::KapotError;
50#[cfg(not(windows))]
51use kapot_core::object_store_registry::cache::CachedBasedObjectStoreRegistry;
52use kapot_core::serde::protobuf::executor_resource::Resource;
53use kapot_core::serde::protobuf::executor_status::Status;
54use kapot_core::serde::protobuf::{
55 executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
56 ExecutorRegistration, ExecutorResource, ExecutorSpecification, ExecutorStatus,
57 ExecutorStoppedParams, HeartBeatParams,
58};
59use kapot_core::serde::KapotCodec;
60use kapot_core::utils::{
61 create_grpc_client_connection, create_grpc_server, get_time_before,
62};
63use kapot_core::KAPOT_VERSION;
64
65use crate::execution_engine::ExecutionEngine;
66use crate::executor::{Executor, TasksDrainedFuture};
67use crate::executor_server::TERMINATING;
68use crate::flight_service::KapotFlightService;
69use crate::metrics::LoggingMetricsCollector;
70use crate::shutdown::Shutdown;
71use crate::shutdown::ShutdownNotifier;
72use crate::terminate;
73use crate::{execution_loop, executor_server};
74
75pub struct ExecutorProcessConfig {
76 pub bind_host: String,
77 pub external_host: Option<String>,
78 pub port: u16,
79 pub grpc_port: u16,
80 pub scheduler_host: String,
81 pub scheduler_port: u16,
82 pub scheduler_connect_timeout_seconds: u16,
83 pub concurrent_tasks: usize,
84 pub task_scheduling_policy: TaskSchedulingPolicy,
85 pub log_dir: Option<String>,
86 pub work_dir: Option<String>,
87 pub special_mod_log_level: String,
88 pub print_thread_info: bool,
89 pub log_file_name_prefix: String,
90 pub log_rotation_policy: LogRotationPolicy,
91 pub job_data_ttl_seconds: u64,
92 pub job_data_clean_up_interval_seconds: u64,
93 pub data_cache_policy: Option<DataCachePolicy>,
94 pub cache_dir: Option<String>,
95 pub cache_capacity: u64,
96 pub cache_io_concurrency: u32,
97 pub grpc_max_decoding_message_size: u32,
99 pub grpc_max_encoding_message_size: u32,
101 pub executor_heartbeat_interval_seconds: u64,
102 pub execution_engine: Option<Arc<dyn ExecutionEngine>>,
105}
106
107pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<()> {
108 let rust_log = env::var(EnvFilter::DEFAULT_ENV);
109 let log_filter = EnvFilter::new(rust_log.unwrap_or(opt.special_mod_log_level.clone()));
110
111 if let Some(log_dir) = opt.log_dir.clone() {
113 let log_file = match opt.log_rotation_policy {
114 LogRotationPolicy::Minutely => {
115 tracing_appender::rolling::minutely(log_dir, &opt.log_file_name_prefix)
116 }
117 LogRotationPolicy::Hourly => {
118 tracing_appender::rolling::hourly(log_dir, &opt.log_file_name_prefix)
119 }
120 LogRotationPolicy::Daily => {
121 tracing_appender::rolling::daily(log_dir, &opt.log_file_name_prefix)
122 }
123 LogRotationPolicy::Never => {
124 tracing_appender::rolling::never(log_dir, &opt.log_file_name_prefix)
125 }
126 };
127 tracing_subscriber::fmt()
128 .with_ansi(false)
129 .with_thread_names(opt.print_thread_info)
130 .with_thread_ids(opt.print_thread_info)
131 .with_writer(log_file)
132 .with_env_filter(log_filter)
133 .init();
134 } else {
135 tracing_subscriber::fmt()
137 .with_ansi(false)
138 .with_thread_names(opt.print_thread_info)
139 .with_thread_ids(opt.print_thread_info)
140 .with_writer(io::stdout)
141 .with_env_filter(log_filter)
142 .init();
143 }
144
145 let addr = format!("{}:{}", opt.bind_host, opt.port);
146 let addr = addr
147 .parse()
148 .with_context(|| format!("Could not parse address: {addr}"))?;
149
150 let scheduler_host = opt.scheduler_host.clone();
151 let scheduler_port = opt.scheduler_port;
152 let scheduler_url = format!("http://{scheduler_host}:{scheduler_port}");
153
154 let work_dir = opt.work_dir.clone().unwrap_or(
155 TempDir::new()?
156 .into_path()
157 .into_os_string()
158 .into_string()
159 .unwrap(),
160 );
161
162 let concurrent_tasks = if opt.concurrent_tasks == 0 {
163 num_cpus::get()
165 } else {
166 opt.concurrent_tasks
167 };
168
169 info!("Running with config:");
170 info!("work_dir: {}", work_dir);
171 info!("concurrent_tasks: {}", concurrent_tasks);
172
173 let executor_id = Uuid::new_v4().to_string();
175 let executor_meta = ExecutorRegistration {
176 id: executor_id.clone(),
177 optional_host: opt
178 .external_host
179 .clone()
180 .map(executor_registration::OptionalHost::Host),
181 port: opt.port as u32,
182 grpc_port: opt.grpc_port as u32,
183 specification: Some(ExecutorSpecification {
184 resources: vec![ExecutorResource {
185 resource: Some(Resource::TaskSlots(concurrent_tasks as u32)),
186 }],
187 }),
188 };
189
190 let runtime_env = RuntimeEnvBuilder::new()
191 .with_object_store_registry(Arc::new(KapotObjectStoreRegistry::new()))
192 .with_temp_file_path(work_dir.clone())
193 .build()
194 .unwrap();
195
196 #[cfg(not(windows))]
198 let runtime_with_data_cache = {
199 let cache_dir = opt.cache_dir.clone();
200 let cache_capacity = opt.cache_capacity;
201 let cache_io_concurrency = opt.cache_io_concurrency;
202 let cache_layer =
203 opt.data_cache_policy
204 .map(|data_cache_policy| match data_cache_policy {
205 DataCachePolicy::LocalDiskFile => {
206 let cache_dir = cache_dir.unwrap();
207 let cache_layer = FileCacheLayer::new(
208 cache_capacity as usize,
209 cache_io_concurrency,
210 LocalDiskMedium::new(cache_dir),
211 );
212 CacheLayer::LocalDiskFile(Arc::new(cache_layer))
213 }
214 });
215 if let Some(cache_layer) = cache_layer {
216 let registry = Arc::new(CachedBasedObjectStoreRegistry::new(
217 runtime_env.object_store_registry.clone(),
218 cache_layer,
219 ));
220 Some(Arc::new(RuntimeEnv {
221 memory_pool: runtime_env.memory_pool.clone(),
222 disk_manager: runtime_env.disk_manager.clone(),
223 cache_manager: runtime_env.cache_manager.clone(),
224 object_store_registry: registry,
225 }))
226 } else {
227 None
228 }
229 };
230
231 let metrics_collector = Arc::new(LoggingMetricsCollector::default());
232
233 let executor = Arc::new(Executor::new(
234 executor_meta,
235 &work_dir,
236 Arc::new(runtime_env),
237 runtime_with_data_cache,
238 metrics_collector,
239 concurrent_tasks,
240 opt.execution_engine.clone(),
241 ));
242
243 let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
244 let connection = if connect_timeout == 0 {
245 create_grpc_client_connection(scheduler_url)
246 .await
247 .context("Could not connect to scheduler")
248 } else {
249 let start_time = Instant::now().elapsed().as_secs();
253 let mut x = None;
254 while x.is_none()
255 && Instant::now().elapsed().as_secs() - start_time < connect_timeout
256 {
257 match create_grpc_client_connection(scheduler_url.clone())
258 .await
259 .context("Could not connect to scheduler")
260 {
261 Ok(conn) => {
262 info!("Connected to scheduler at {}", scheduler_url);
263 x = Some(conn);
264 }
265 Err(e) => {
266 warn!(
267 "Failed to connect to scheduler at {} ({}); retrying ...",
268 scheduler_url, e
269 );
270 std::thread::sleep(time::Duration::from_millis(500));
271 }
272 }
273 }
274 match x {
275 Some(conn) => Ok(conn),
276 _ => Err(KapotError::General(format!(
277 "Timed out attempting to connect to scheduler at {scheduler_url}"
278 ))
279 .into()),
280 }
281 }?;
282
283 let mut scheduler = SchedulerGrpcClient::new(connection)
284 .max_encoding_message_size(opt.grpc_max_encoding_message_size as usize)
285 .max_decoding_message_size(opt.grpc_max_decoding_message_size as usize);
286
287 let default_codec: KapotCodec<LogicalPlanNode, PhysicalPlanNode> = KapotCodec::default();
288
289 let scheduler_policy = opt.task_scheduling_policy;
290 let job_data_ttl_seconds = opt.job_data_ttl_seconds;
291
292 let shutdown_noti = ShutdownNotifier::new();
294
295 if opt.job_data_clean_up_interval_seconds > 0 {
296 let mut interval_time =
297 time::interval(Duration::from_secs(opt.job_data_clean_up_interval_seconds));
298 let mut shuffle_cleaner_shutdown = shutdown_noti.subscribe_for_shutdown();
299 let shuffle_cleaner_complete = shutdown_noti.shutdown_complete_tx.clone();
300 tokio::spawn(async move {
301 while !shuffle_cleaner_shutdown.is_shutdown() {
303 tokio::select! {
304 _ = interval_time.tick() => {
305 if let Err(e) = clean_shuffle_data_loop(&work_dir, job_data_ttl_seconds).await
306 {
307 error!("kapot executor fail to clean_shuffle_data {:?}", e)
308 }
309 },
310 _ = shuffle_cleaner_shutdown.recv() => {
311 if let Err(e) = clean_all_shuffle_data(&work_dir).await
312 {
313 error!("kapot executor fail to clean_shuffle_data {:?}", e)
314 } else {
315 info!("Shuffle data cleaned.");
316 }
317 drop(shuffle_cleaner_complete);
318 return;
319 }
320 };
321 }
322 });
323 }
324
325 let mut service_handlers: FuturesUnordered<JoinHandle<Result<(), KapotError>>> =
326 FuturesUnordered::new();
327
328 let (stop_send, mut stop_recv) = mpsc::channel::<bool>(10);
330
331 match scheduler_policy {
332 TaskSchedulingPolicy::PushStaged => {
333 service_handlers.push(
334 executor_server::startup(
336 scheduler.clone(),
337 opt.clone(),
338 executor.clone(),
339 default_codec,
340 stop_send,
341 &shutdown_noti,
342 )
343 .await?,
344 );
345 }
346 _ => {
347 service_handlers.push(tokio::spawn(execution_loop::poll_loop(
348 scheduler.clone(),
349 executor.clone(),
350 default_codec,
351 )));
352 }
353 };
354 service_handlers.push(tokio::spawn(flight_server_run(
355 addr,
356 shutdown_noti.subscribe_for_shutdown(),
357 )));
358
359 let tasks_drained = TasksDrainedFuture(executor);
360
361 let (notify_scheduler, stop_reason) = tokio::select! {
365 service_val = check_services(&mut service_handlers) => {
366 let msg = format!("executor services stopped with reason {service_val:?}");
367 info!("{:?}", msg);
368 (true, msg)
369 },
370 _ = signal::ctrl_c() => {
371 let msg = "executor received ctrl-c event.".to_string();
372 info!("{:?}", msg);
373 (true, msg)
374 },
375 _ = terminate::sig_term() => {
376 let msg = "executor received terminate signal.".to_string();
377 info!("{:?}", msg);
378 (true, msg)
379 },
380 _ = stop_recv.recv() => {
381 (false, "".to_string())
382 },
383 };
384
385 info!("setting executor to TERMINATING status");
387 TERMINATING.store(true, Ordering::Release);
388
389 if notify_scheduler {
390 if let Err(error) = scheduler
393 .heart_beat_from_executor(HeartBeatParams {
394 executor_id: executor_id.clone(),
395 metrics: vec![],
396 status: Some(ExecutorStatus {
397 status: Some(Status::Terminating(String::default())),
398 }),
399 metadata: Some(ExecutorRegistration {
400 id: executor_id.clone(),
401 optional_host: opt
402 .external_host
403 .clone()
404 .map(executor_registration::OptionalHost::Host),
405 port: opt.port as u32,
406 grpc_port: opt.grpc_port as u32,
407 specification: Some(ExecutorSpecification {
408 resources: vec![ExecutorResource {
409 resource: Some(Resource::TaskSlots(concurrent_tasks as u32)),
410 }],
411 }),
412 }),
413 })
414 .await
415 {
416 error!("error sending heartbeat with fenced status: {:?}", error);
417 }
418
419 if let Err(error) = scheduler
421 .executor_stopped(ExecutorStoppedParams {
422 executor_id,
423 reason: stop_reason,
424 })
425 .await
426 {
427 error!("ExecutorStopped grpc failed: {:?}", error);
428 }
429
430 tasks_drained.await;
432 }
433
434 let ShutdownNotifier {
438 mut shutdown_complete_rx,
439 shutdown_complete_tx,
440 notify_shutdown,
441 ..
442 } = shutdown_noti;
443
444 drop(notify_shutdown);
447 drop(shutdown_complete_tx);
449
450 let _ = shutdown_complete_rx.recv().await;
452 info!("Executor stopped.");
453 Ok(())
454}
455
456async fn flight_server_run(
458 addr: SocketAddr,
459 mut grpc_shutdown: Shutdown,
460) -> Result<(), KapotError> {
461 let service = KapotFlightService::new();
462 let server = FlightServiceServer::new(service);
463 info!(
464 "kapot v{} Rust Executor Flight Server listening on {:?}",
465 KAPOT_VERSION, addr
466 );
467
468 let shutdown_signal = grpc_shutdown.recv();
469 let server_future = create_grpc_server()
470 .add_service(server)
471 .serve_with_shutdown(addr, shutdown_signal);
472
473 server_future.await.map_err(|e| {
474 error!("Tonic error, Could not start Executor Flight Server.");
475 KapotError::TonicError(e)
476 })
477}
478
479async fn check_services(
481 service_handlers: &mut FuturesUnordered<JoinHandle<Result<(), KapotError>>>,
482) -> Result<(), KapotError> {
483 loop {
484 match service_handlers.next().await {
485 Some(result) => match result {
486 Ok(inner_result) => match inner_result {
488 Ok(()) => (),
489 Err(e) => return Err(e),
490 },
491 Err(e) => return Err(KapotError::TokioError(e)),
493 },
494 None => {
495 info!("service handlers are all done with their work!");
496 return Ok(());
497 }
498 }
499 }
500}
501
502async fn clean_shuffle_data_loop(work_dir: &str, seconds: u64) -> Result<()> {
505 let mut dir = fs::read_dir(work_dir).await?;
506 let mut to_deleted = Vec::new();
507 while let Some(child) = dir.next_entry().await? {
508 if let Ok(metadata) = child.metadata().await {
509 let child_path = child.path().into_os_string();
510 if metadata.is_dir() {
512 match satisfy_dir_ttl(child, seconds).await {
513 Err(e) => {
514 error!(
515 "Fail to check ttl for the directory {:?} due to {:?}",
516 child_path, e
517 )
518 }
519 Ok(false) => to_deleted.push(child_path),
520 Ok(_) => {}
521 }
522 } else {
523 warn!("{:?} under the working directory is a not a directory and will be ignored when doing cleanup", child_path)
524 }
525 } else {
526 error!("Fail to get metadata for file {:?}", child.path())
527 }
528 }
529 info!(
530 "The directories {:?} that have not been modified for {:?} seconds so that they will be deleted",
531 to_deleted, seconds
532 );
533 for del in to_deleted {
534 if let Err(e) = fs::remove_dir_all(&del).await {
535 error!("Fail to remove the directory {:?} due to {}", del, e);
536 }
537 }
538 Ok(())
539}
540
541async fn clean_all_shuffle_data(work_dir: &str) -> Result<()> {
543 let mut dir = fs::read_dir(work_dir).await?;
544 let mut to_deleted = Vec::new();
545 while let Some(child) = dir.next_entry().await? {
546 if let Ok(metadata) = child.metadata().await {
547 if metadata.is_dir() {
549 to_deleted.push(child.path().into_os_string())
550 }
551 } else {
552 error!("Can not get metadata from file: {:?}", child)
553 }
554 }
555
556 info!("The work_dir {:?} will be deleted", &to_deleted);
557 for del in to_deleted {
558 if let Err(e) = fs::remove_dir_all(&del).await {
559 error!("Fail to remove the directory {:?} due to {}", del, e);
560 }
561 }
562 Ok(())
563}
564
565pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds: u64) -> Result<bool> {
568 let cutoff = get_time_before(ttl_seconds);
569
570 let mut to_check = vec![dir];
571 while let Some(dir) = to_check.pop() {
572 if dir
574 .metadata()
575 .await?
576 .modified()?
577 .duration_since(UNIX_EPOCH)
578 .expect("Time went backwards")
579 .as_secs()
580 > cutoff
581 {
582 return Ok(true);
583 }
584 let mut children = fs::read_dir(dir.path()).await?;
586 while let Some(child) = children.next_entry().await? {
587 let metadata = child.metadata().await?;
588 if metadata.is_dir() {
589 to_check.push(child);
590 } else if metadata
591 .modified()?
592 .duration_since(UNIX_EPOCH)
593 .expect("Time went backwards")
594 .as_secs()
595 > cutoff
596 {
597 return Ok(true);
598 };
599 }
600 }
601
602 Ok(false)
603}
604
605#[cfg(test)]
606mod tests {
607 use super::clean_shuffle_data_loop;
608 use std::fs;
609 use std::fs::File;
610 use std::io::Write;
611 use std::time::Duration;
612 use tempfile::TempDir;
613
614 #[tokio::test]
615 async fn test_executor_clean_up() {
616 let work_dir = TempDir::new().unwrap().into_path();
617 let job_dir = work_dir.as_path().join("job_id");
618 let file_path = job_dir.as_path().join("tmp.csv");
619 let data = "Jorge,2018-12-13T12:12:10.011Z\n\
620 Andrew,2018-11-13T17:11:10.011Z";
621 fs::create_dir(job_dir).unwrap();
622 File::create(&file_path)
623 .expect("creating temp file")
624 .write_all(data.as_bytes())
625 .expect("writing data");
626
627 let work_dir_clone = work_dir.clone();
628
629 let count1 = fs::read_dir(work_dir.clone()).unwrap().count();
630 assert_eq!(count1, 1);
631 let mut handles = vec![];
632 handles.push(tokio::spawn(async move {
633 tokio::time::sleep(Duration::from_secs(2)).await;
634 clean_shuffle_data_loop(work_dir_clone.to_str().unwrap(), 1)
635 .await
636 .unwrap();
637 }));
638 futures::future::join_all(handles).await;
639 let count2 = fs::read_dir(work_dir.clone()).unwrap().count();
640 assert_eq!(count2, 0);
641 }
642}