kapot_executor/
executor_process.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! kapot Executor Process
19
20use std::net::SocketAddr;
21use std::sync::atomic::Ordering;
22use std::sync::Arc;
23use std::time::{Duration, Instant, UNIX_EPOCH};
24use std::{env, io};
25
26use anyhow::{Context, Result};
27use arrow_flight::flight_service_server::FlightServiceServer;
28use futures::stream::FuturesUnordered;
29use futures::StreamExt;
30use kapot_core::object_store_registry::KapotObjectStoreRegistry;
31use log::{error, info, warn};
32use tempfile::TempDir;
33use tokio::fs::DirEntry;
34use tokio::signal;
35use tokio::sync::mpsc;
36use tokio::task::JoinHandle;
37use tokio::{fs, time};
38use tracing_subscriber::EnvFilter;
39use uuid::Uuid;
40
41use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
42use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
43
44#[cfg(not(windows))]
45use kapot_core::cache_layer::{
46    medium::local_disk::LocalDiskMedium, policy::file::FileCacheLayer, CacheLayer,
47};
48use kapot_core::config::{DataCachePolicy, LogRotationPolicy, TaskSchedulingPolicy};
49use kapot_core::error::KapotError;
50#[cfg(not(windows))]
51use kapot_core::object_store_registry::cache::CachedBasedObjectStoreRegistry;
52use kapot_core::serde::protobuf::executor_resource::Resource;
53use kapot_core::serde::protobuf::executor_status::Status;
54use kapot_core::serde::protobuf::{
55    executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
56    ExecutorRegistration, ExecutorResource, ExecutorSpecification, ExecutorStatus,
57    ExecutorStoppedParams, HeartBeatParams,
58};
59use kapot_core::serde::KapotCodec;
60use kapot_core::utils::{
61    create_grpc_client_connection, create_grpc_server, get_time_before,
62};
63use kapot_core::KAPOT_VERSION;
64
65use crate::execution_engine::ExecutionEngine;
66use crate::executor::{Executor, TasksDrainedFuture};
67use crate::executor_server::TERMINATING;
68use crate::flight_service::KapotFlightService;
69use crate::metrics::LoggingMetricsCollector;
70use crate::shutdown::Shutdown;
71use crate::shutdown::ShutdownNotifier;
72use crate::terminate;
73use crate::{execution_loop, executor_server};
74
75pub struct ExecutorProcessConfig {
76    pub bind_host: String,
77    pub external_host: Option<String>,
78    pub port: u16,
79    pub grpc_port: u16,
80    pub scheduler_host: String,
81    pub scheduler_port: u16,
82    pub scheduler_connect_timeout_seconds: u16,
83    pub concurrent_tasks: usize,
84    pub task_scheduling_policy: TaskSchedulingPolicy,
85    pub log_dir: Option<String>,
86    pub work_dir: Option<String>,
87    pub special_mod_log_level: String,
88    pub print_thread_info: bool,
89    pub log_file_name_prefix: String,
90    pub log_rotation_policy: LogRotationPolicy,
91    pub job_data_ttl_seconds: u64,
92    pub job_data_clean_up_interval_seconds: u64,
93    pub data_cache_policy: Option<DataCachePolicy>,
94    pub cache_dir: Option<String>,
95    pub cache_capacity: u64,
96    pub cache_io_concurrency: u32,
97    /// The maximum size of a decoded message
98    pub grpc_max_decoding_message_size: u32,
99    /// The maximum size of an encoded message
100    pub grpc_max_encoding_message_size: u32,
101    pub executor_heartbeat_interval_seconds: u64,
102    /// Optional execution engine to use to execute physical plans, will default to
103    /// DataFusion if none is provided.
104    pub execution_engine: Option<Arc<dyn ExecutionEngine>>,
105}
106
107pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<()> {
108    let rust_log = env::var(EnvFilter::DEFAULT_ENV);
109    let log_filter = EnvFilter::new(rust_log.unwrap_or(opt.special_mod_log_level.clone()));
110    
111    // File layer
112    if let Some(log_dir) = opt.log_dir.clone() {
113        let log_file = match opt.log_rotation_policy {
114            LogRotationPolicy::Minutely => {
115                tracing_appender::rolling::minutely(log_dir, &opt.log_file_name_prefix)
116            }
117            LogRotationPolicy::Hourly => {
118                tracing_appender::rolling::hourly(log_dir, &opt.log_file_name_prefix)
119            }
120            LogRotationPolicy::Daily => {
121                tracing_appender::rolling::daily(log_dir, &opt.log_file_name_prefix)
122            }
123            LogRotationPolicy::Never => {
124                tracing_appender::rolling::never(log_dir, &opt.log_file_name_prefix)
125            }
126        };
127        tracing_subscriber::fmt()
128            .with_ansi(false)
129            .with_thread_names(opt.print_thread_info)
130            .with_thread_ids(opt.print_thread_info)
131            .with_writer(log_file)
132            .with_env_filter(log_filter)
133            .init();
134    } else {
135        // Console layer
136        tracing_subscriber::fmt()
137            .with_ansi(false)
138            .with_thread_names(opt.print_thread_info)
139            .with_thread_ids(opt.print_thread_info)
140            .with_writer(io::stdout)
141            .with_env_filter(log_filter)
142            .init();
143    }
144
145    let addr = format!("{}:{}", opt.bind_host, opt.port);
146    let addr = addr
147        .parse()
148        .with_context(|| format!("Could not parse address: {addr}"))?;
149
150    let scheduler_host = opt.scheduler_host.clone();
151    let scheduler_port = opt.scheduler_port;
152    let scheduler_url = format!("http://{scheduler_host}:{scheduler_port}");
153
154    let work_dir = opt.work_dir.clone().unwrap_or(
155        TempDir::new()?
156            .into_path()
157            .into_os_string()
158            .into_string()
159            .unwrap(),
160    );
161
162    let concurrent_tasks = if opt.concurrent_tasks == 0 {
163        // use all available cores if no concurrency level is specified
164        num_cpus::get()
165    } else {
166        opt.concurrent_tasks
167    };
168
169    info!("Running with config:");
170    info!("work_dir: {}", work_dir);
171    info!("concurrent_tasks: {}", concurrent_tasks);
172
173    // assign this executor an unique ID
174    let executor_id = Uuid::new_v4().to_string();
175    let executor_meta = ExecutorRegistration {
176        id: executor_id.clone(),
177        optional_host: opt
178            .external_host
179            .clone()
180            .map(executor_registration::OptionalHost::Host),
181        port: opt.port as u32,
182        grpc_port: opt.grpc_port as u32,
183        specification: Some(ExecutorSpecification {
184            resources: vec![ExecutorResource {
185                resource: Some(Resource::TaskSlots(concurrent_tasks as u32)),
186            }],
187        }),
188    };
189
190    let runtime_env = RuntimeEnvBuilder::new()
191        .with_object_store_registry(Arc::new(KapotObjectStoreRegistry::new()))
192        .with_temp_file_path(work_dir.clone())
193        .build()
194        .unwrap();
195    
196    // Set the object store registry
197    #[cfg(not(windows))]
198    let runtime_with_data_cache = {
199        let cache_dir = opt.cache_dir.clone();
200        let cache_capacity = opt.cache_capacity;
201        let cache_io_concurrency = opt.cache_io_concurrency;
202        let cache_layer =
203            opt.data_cache_policy
204                .map(|data_cache_policy| match data_cache_policy {
205                    DataCachePolicy::LocalDiskFile => {
206                        let cache_dir = cache_dir.unwrap();
207                        let cache_layer = FileCacheLayer::new(
208                            cache_capacity as usize,
209                            cache_io_concurrency,
210                            LocalDiskMedium::new(cache_dir),
211                        );
212                        CacheLayer::LocalDiskFile(Arc::new(cache_layer))
213                    }
214                });
215        if let Some(cache_layer) = cache_layer {
216            let registry = Arc::new(CachedBasedObjectStoreRegistry::new(
217                runtime_env.object_store_registry.clone(),
218                cache_layer,
219            ));
220            Some(Arc::new(RuntimeEnv {
221                memory_pool: runtime_env.memory_pool.clone(),
222                disk_manager: runtime_env.disk_manager.clone(),
223                cache_manager: runtime_env.cache_manager.clone(),
224                object_store_registry: registry,
225            }))
226        } else {
227            None
228        }
229    };
230
231    let metrics_collector = Arc::new(LoggingMetricsCollector::default());
232
233    let executor = Arc::new(Executor::new(
234        executor_meta,
235        &work_dir,
236        Arc::new(runtime_env),
237        runtime_with_data_cache,
238        metrics_collector,
239        concurrent_tasks,
240        opt.execution_engine.clone(),
241    ));
242
243    let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
244    let connection = if connect_timeout == 0 {
245        create_grpc_client_connection(scheduler_url)
246            .await
247            .context("Could not connect to scheduler")
248    } else {
249        // this feature was added to support docker-compose so that we can have the executor
250        // wait for the scheduler to start, or at least run for 10 seconds before failing so
251        // that docker-compose's restart policy will restart the container.
252        let start_time = Instant::now().elapsed().as_secs();
253        let mut x = None;
254        while x.is_none()
255            && Instant::now().elapsed().as_secs() - start_time < connect_timeout
256        {
257            match create_grpc_client_connection(scheduler_url.clone())
258                .await
259                .context("Could not connect to scheduler")
260            {
261                Ok(conn) => {
262                    info!("Connected to scheduler at {}", scheduler_url);
263                    x = Some(conn);
264                }
265                Err(e) => {
266                    warn!(
267                        "Failed to connect to scheduler at {} ({}); retrying ...",
268                        scheduler_url, e
269                    );
270                    std::thread::sleep(time::Duration::from_millis(500));
271                }
272            }
273        }
274        match x {
275            Some(conn) => Ok(conn),
276            _ => Err(KapotError::General(format!(
277                "Timed out attempting to connect to scheduler at {scheduler_url}"
278            ))
279            .into()),
280        }
281    }?;
282
283    let mut scheduler = SchedulerGrpcClient::new(connection)
284        .max_encoding_message_size(opt.grpc_max_encoding_message_size as usize)
285        .max_decoding_message_size(opt.grpc_max_decoding_message_size as usize);
286
287    let default_codec: KapotCodec<LogicalPlanNode, PhysicalPlanNode> = KapotCodec::default();
288
289    let scheduler_policy = opt.task_scheduling_policy;
290    let job_data_ttl_seconds = opt.job_data_ttl_seconds;
291
292    // Graceful shutdown notification
293    let shutdown_noti = ShutdownNotifier::new();
294
295    if opt.job_data_clean_up_interval_seconds > 0 {
296        let mut interval_time =
297            time::interval(Duration::from_secs(opt.job_data_clean_up_interval_seconds));
298        let mut shuffle_cleaner_shutdown = shutdown_noti.subscribe_for_shutdown();
299        let shuffle_cleaner_complete = shutdown_noti.shutdown_complete_tx.clone();
300        tokio::spawn(async move {
301            // As long as the shutdown notification has not been received
302            while !shuffle_cleaner_shutdown.is_shutdown() {
303                tokio::select! {
304                    _ = interval_time.tick() => {
305                            if let Err(e) = clean_shuffle_data_loop(&work_dir, job_data_ttl_seconds).await
306                        {
307                            error!("kapot executor fail to clean_shuffle_data {:?}", e)
308                        }
309                        },
310                    _ = shuffle_cleaner_shutdown.recv() => {
311                        if let Err(e) = clean_all_shuffle_data(&work_dir).await
312                        {
313                            error!("kapot executor fail to clean_shuffle_data {:?}", e)
314                        } else {
315                            info!("Shuffle data cleaned.");
316                        }
317                        drop(shuffle_cleaner_complete);
318                        return;
319                    }
320                };
321            }
322        });
323    }
324
325    let mut service_handlers: FuturesUnordered<JoinHandle<Result<(), KapotError>>> =
326        FuturesUnordered::new();
327
328    // Channels used to receive stop requests from Executor grpc service.
329    let (stop_send, mut stop_recv) = mpsc::channel::<bool>(10);
330
331    match scheduler_policy {
332        TaskSchedulingPolicy::PushStaged => {
333            service_handlers.push(
334                //If there is executor registration error during startup, return the error and stop early.
335                executor_server::startup(
336                    scheduler.clone(),
337                    opt.clone(),
338                    executor.clone(),
339                    default_codec,
340                    stop_send,
341                    &shutdown_noti,
342                )
343                .await?,
344            );
345        }
346        _ => {
347            service_handlers.push(tokio::spawn(execution_loop::poll_loop(
348                scheduler.clone(),
349                executor.clone(),
350                default_codec,
351            )));
352        }
353    };
354    service_handlers.push(tokio::spawn(flight_server_run(
355        addr,
356        shutdown_noti.subscribe_for_shutdown(),
357    )));
358
359    let tasks_drained = TasksDrainedFuture(executor);
360
361    // Concurrently run the service checking and listen for the `shutdown` signal and wait for the stop request coming.
362    // The check_services runs until an error is encountered, so under normal circumstances, this `select!` statement runs
363    // until the `shutdown` signal is received or a stop request is coming.
364    let (notify_scheduler, stop_reason) = tokio::select! {
365        service_val = check_services(&mut service_handlers) => {
366            let msg = format!("executor services stopped with reason {service_val:?}");
367            info!("{:?}", msg);
368            (true, msg)
369        },
370        _ = signal::ctrl_c() => {
371            let msg = "executor received ctrl-c event.".to_string();
372             info!("{:?}", msg);
373            (true, msg)
374        },
375        _ = terminate::sig_term() => {
376            let msg = "executor received terminate signal.".to_string();
377             info!("{:?}", msg);
378            (true, msg)
379        },
380        _ = stop_recv.recv() => {
381            (false, "".to_string())
382        },
383    };
384
385    // Set status to fenced
386    info!("setting executor to TERMINATING status");
387    TERMINATING.store(true, Ordering::Release);
388
389    if notify_scheduler {
390        // Send a heartbeat to update status of executor to `Fenced`. This should signal to the
391        // scheduler to no longer schedule tasks on this executor
392        if let Err(error) = scheduler
393            .heart_beat_from_executor(HeartBeatParams {
394                executor_id: executor_id.clone(),
395                metrics: vec![],
396                status: Some(ExecutorStatus {
397                    status: Some(Status::Terminating(String::default())),
398                }),
399                metadata: Some(ExecutorRegistration {
400                    id: executor_id.clone(),
401                    optional_host: opt
402                        .external_host
403                        .clone()
404                        .map(executor_registration::OptionalHost::Host),
405                    port: opt.port as u32,
406                    grpc_port: opt.grpc_port as u32,
407                    specification: Some(ExecutorSpecification {
408                        resources: vec![ExecutorResource {
409                            resource: Some(Resource::TaskSlots(concurrent_tasks as u32)),
410                        }],
411                    }),
412                }),
413            })
414            .await
415        {
416            error!("error sending heartbeat with fenced status: {:?}", error);
417        }
418
419        // TODO we probably don't need a separate rpc call for this....
420        if let Err(error) = scheduler
421            .executor_stopped(ExecutorStoppedParams {
422                executor_id,
423                reason: stop_reason,
424            })
425            .await
426        {
427            error!("ExecutorStopped grpc failed: {:?}", error);
428        }
429
430        // Wait for tasks to drain
431        tasks_drained.await;
432    }
433
434    // Extract the `shutdown_complete` receiver and transmitter
435    // explicitly drop `shutdown_transmitter`. This is important, as the
436    // `.await` below would otherwise never complete.
437    let ShutdownNotifier {
438        mut shutdown_complete_rx,
439        shutdown_complete_tx,
440        notify_shutdown,
441        ..
442    } = shutdown_noti;
443
444    // When `notify_shutdown` is dropped, all components which have `subscribe`d will
445    // receive the shutdown signal and can exit
446    drop(notify_shutdown);
447    // Drop final `Sender` so the `Receiver` below can complete
448    drop(shutdown_complete_tx);
449
450    // Wait for all related components to finish the shutdown processing.
451    let _ = shutdown_complete_rx.recv().await;
452    info!("Executor stopped.");
453    Ok(())
454}
455
456// Arrow flight service
457async fn flight_server_run(
458    addr: SocketAddr,
459    mut grpc_shutdown: Shutdown,
460) -> Result<(), KapotError> {
461    let service = KapotFlightService::new();
462    let server = FlightServiceServer::new(service);
463    info!(
464        "kapot v{} Rust Executor Flight Server listening on {:?}",
465        KAPOT_VERSION, addr
466    );
467
468    let shutdown_signal = grpc_shutdown.recv();
469    let server_future = create_grpc_server()
470        .add_service(server)
471        .serve_with_shutdown(addr, shutdown_signal);
472
473    server_future.await.map_err(|e| {
474        error!("Tonic error, Could not start Executor Flight Server.");
475        KapotError::TonicError(e)
476    })
477}
478
479// Check the status of long running services
480async fn check_services(
481    service_handlers: &mut FuturesUnordered<JoinHandle<Result<(), KapotError>>>,
482) -> Result<(), KapotError> {
483    loop {
484        match service_handlers.next().await {
485            Some(result) => match result {
486                // React to "inner_result", i.e. propagate as kapotError
487                Ok(inner_result) => match inner_result {
488                    Ok(()) => (),
489                    Err(e) => return Err(e),
490                },
491                // React to JoinError
492                Err(e) => return Err(KapotError::TokioError(e)),
493            },
494            None => {
495                info!("service handlers are all done with their work!");
496                return Ok(());
497            }
498        }
499    }
500}
501
502/// This function will be scheduled periodically for cleanup the job shuffle data left on the executor.
503/// Only directories will be checked cleaned.
504async fn clean_shuffle_data_loop(work_dir: &str, seconds: u64) -> Result<()> {
505    let mut dir = fs::read_dir(work_dir).await?;
506    let mut to_deleted = Vec::new();
507    while let Some(child) = dir.next_entry().await? {
508        if let Ok(metadata) = child.metadata().await {
509            let child_path = child.path().into_os_string();
510            // only delete the job dir
511            if metadata.is_dir() {
512                match satisfy_dir_ttl(child, seconds).await {
513                    Err(e) => {
514                        error!(
515                            "Fail to check ttl for the directory {:?} due to {:?}",
516                            child_path, e
517                        )
518                    }
519                    Ok(false) => to_deleted.push(child_path),
520                    Ok(_) => {}
521                }
522            } else {
523                warn!("{:?} under the working directory is a not a directory and will be ignored when doing cleanup", child_path)
524            }
525        } else {
526            error!("Fail to get metadata for file {:?}", child.path())
527        }
528    }
529    info!(
530        "The directories {:?} that have not been modified for {:?} seconds so that they will be deleted",
531        to_deleted, seconds
532    );
533    for del in to_deleted {
534        if let Err(e) = fs::remove_dir_all(&del).await {
535            error!("Fail to remove the directory {:?} due to {}", del, e);
536        }
537    }
538    Ok(())
539}
540
541/// This function will clean up all shuffle data on this executor
542async fn clean_all_shuffle_data(work_dir: &str) -> Result<()> {
543    let mut dir = fs::read_dir(work_dir).await?;
544    let mut to_deleted = Vec::new();
545    while let Some(child) = dir.next_entry().await? {
546        if let Ok(metadata) = child.metadata().await {
547            // only delete the job dir
548            if metadata.is_dir() {
549                to_deleted.push(child.path().into_os_string())
550            }
551        } else {
552            error!("Can not get metadata from file: {:?}", child)
553        }
554    }
555
556    info!("The work_dir {:?} will be deleted", &to_deleted);
557    for del in to_deleted {
558        if let Err(e) = fs::remove_dir_all(&del).await {
559            error!("Fail to remove the directory {:?} due to {}", del, e);
560        }
561    }
562    Ok(())
563}
564
565/// Determines if a directory contains files newer than the cutoff time.
566/// If return true, it means the directory contains files newer than the cutoff time. It satisfy the ttl and should not be deleted.
567pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds: u64) -> Result<bool> {
568    let cutoff = get_time_before(ttl_seconds);
569
570    let mut to_check = vec![dir];
571    while let Some(dir) = to_check.pop() {
572        // Check the ttl for the current directory first
573        if dir
574            .metadata()
575            .await?
576            .modified()?
577            .duration_since(UNIX_EPOCH)
578            .expect("Time went backwards")
579            .as_secs()
580            > cutoff
581        {
582            return Ok(true);
583        }
584        // Check its children
585        let mut children = fs::read_dir(dir.path()).await?;
586        while let Some(child) = children.next_entry().await? {
587            let metadata = child.metadata().await?;
588            if metadata.is_dir() {
589                to_check.push(child);
590            } else if metadata
591                .modified()?
592                .duration_since(UNIX_EPOCH)
593                .expect("Time went backwards")
594                .as_secs()
595                > cutoff
596            {
597                return Ok(true);
598            };
599        }
600    }
601
602    Ok(false)
603}
604
605#[cfg(test)]
606mod tests {
607    use super::clean_shuffle_data_loop;
608    use std::fs;
609    use std::fs::File;
610    use std::io::Write;
611    use std::time::Duration;
612    use tempfile::TempDir;
613
614    #[tokio::test]
615    async fn test_executor_clean_up() {
616        let work_dir = TempDir::new().unwrap().into_path();
617        let job_dir = work_dir.as_path().join("job_id");
618        let file_path = job_dir.as_path().join("tmp.csv");
619        let data = "Jorge,2018-12-13T12:12:10.011Z\n\
620                    Andrew,2018-11-13T17:11:10.011Z";
621        fs::create_dir(job_dir).unwrap();
622        File::create(&file_path)
623            .expect("creating temp file")
624            .write_all(data.as_bytes())
625            .expect("writing data");
626
627        let work_dir_clone = work_dir.clone();
628
629        let count1 = fs::read_dir(work_dir.clone()).unwrap().count();
630        assert_eq!(count1, 1);
631        let mut handles = vec![];
632        handles.push(tokio::spawn(async move {
633            tokio::time::sleep(Duration::from_secs(2)).await;
634            clean_shuffle_data_loop(work_dir_clone.to_str().unwrap(), 1)
635                .await
636                .unwrap();
637        }));
638        futures::future::join_all(handles).await;
639        let count2 = fs::read_dir(work_dir.clone()).unwrap().count();
640        assert_eq!(count2, 0);
641    }
642}