kapot_executor/
standalone.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::metrics::LoggingMetricsCollector;
19use crate::{execution_loop, executor::Executor, flight_service::KapotFlightService};
20use arrow_flight::flight_service_server::FlightServiceServer;
21use kapot_core::object_store_registry::KapotObjectStoreRegistry;
22use kapot_core::{
23    error::Result,
24    serde::protobuf::executor_registration::OptionalHost,
25    serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration},
26    serde::scheduler::ExecutorSpecification,
27    serde::KapotCodec,
28    utils::create_grpc_server,
29    KAPOT_VERSION,
30};
31use datafusion::execution::runtime_env::RuntimeEnvBuilder;
32use datafusion_proto::logical_plan::AsLogicalPlan;
33use datafusion_proto::physical_plan::AsExecutionPlan;
34use log::info;
35use std::sync::Arc;
36use tempfile::TempDir;
37use tokio::net::TcpListener;
38use tonic::transport::Channel;
39use uuid::Uuid;
40
41pub async fn new_standalone_executor<
42    T: 'static + AsLogicalPlan,
43    U: 'static + AsExecutionPlan,
44>(
45    scheduler: SchedulerGrpcClient<Channel>,
46    concurrent_tasks: usize,
47    codec: KapotCodec<T, U>,
48) -> Result<()> {
49    // Let the OS assign a random, free port
50    let listener = TcpListener::bind("0.0.0.0:0").await?;
51    let addr = listener.local_addr()?;
52    info!(
53        "kapot v{} Rust Executor listening on {:?}",
54        KAPOT_VERSION, addr
55    );
56
57    let executor_meta = ExecutorRegistration {
58        id: Uuid::new_v4().to_string(), // assign this executor a unique ID
59        optional_host: Some(OptionalHost::Host("0.0.0.0".to_string())),
60        port: addr.port() as u32,
61        // TODO Make it configurable
62        grpc_port: 50020,
63        specification: Some(
64            ExecutorSpecification {
65                task_slots: concurrent_tasks as u32,
66            }
67            .into(),
68        ),
69    };
70    let work_dir = TempDir::new()?
71        .into_path()
72        .into_os_string()
73        .into_string()
74        .unwrap();
75    info!("work_dir: {}", work_dir);
76
77    let runtime = RuntimeEnvBuilder::new()
78        .with_temp_file_path(work_dir.clone())
79        .with_object_store_registry(Arc::new(KapotObjectStoreRegistry::new()))
80        .build()?;
81
82    let executor = Arc::new(Executor::new(
83        executor_meta,
84        &work_dir,
85        Arc::new(runtime),
86        None,
87        Arc::new(LoggingMetricsCollector::default()),
88        concurrent_tasks,
89        None,
90    ));
91
92    let service = KapotFlightService::new();
93    let server = FlightServiceServer::new(service);
94    tokio::spawn(
95        create_grpc_server()
96            .add_service(server)
97            .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
98                listener,
99            )),
100    );
101
102    tokio::spawn(execution_loop::poll_loop(scheduler, executor, codec));
103    Ok(())
104}