kapot_executor/
standalone.rsuse crate::metrics::LoggingMetricsCollector;
use crate::{execution_loop, executor::Executor, flight_service::KapotFlightService};
use arrow_flight::flight_service_server::FlightServiceServer;
use kapot_core::object_store_registry::KapotObjectStoreRegistry;
use kapot_core::{
error::Result,
serde::protobuf::executor_registration::OptionalHost,
serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration},
serde::scheduler::ExecutorSpecification,
serde::KapotCodec,
utils::create_grpc_server,
KAPOT_VERSION,
};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion_proto::logical_plan::AsLogicalPlan;
use datafusion_proto::physical_plan::AsExecutionPlan;
use log::info;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::TcpListener;
use tonic::transport::Channel;
use uuid::Uuid;
pub async fn new_standalone_executor<
T: 'static + AsLogicalPlan,
U: 'static + AsExecutionPlan,
>(
scheduler: SchedulerGrpcClient<Channel>,
concurrent_tasks: usize,
codec: KapotCodec<T, U>,
) -> Result<()> {
let listener = TcpListener::bind("0.0.0.0:0").await?;
let addr = listener.local_addr()?;
info!(
"kapot v{} Rust Executor listening on {:?}",
KAPOT_VERSION, addr
);
let executor_meta = ExecutorRegistration {
id: Uuid::new_v4().to_string(), optional_host: Some(OptionalHost::Host("0.0.0.0".to_string())),
port: addr.port() as u32,
grpc_port: 50020,
specification: Some(
ExecutorSpecification {
task_slots: concurrent_tasks as u32,
}
.into(),
),
};
let work_dir = TempDir::new()?
.into_path()
.into_os_string()
.into_string()
.unwrap();
info!("work_dir: {}", work_dir);
let runtime = RuntimeEnvBuilder::new()
.with_temp_file_path(work_dir.clone())
.with_object_store_registry(Arc::new(KapotObjectStoreRegistry::new()))
.build()?;
let executor = Arc::new(Executor::new(
executor_meta,
&work_dir,
Arc::new(runtime),
None,
Arc::new(LoggingMetricsCollector::default()),
concurrent_tasks,
None,
));
let service = KapotFlightService::new();
let server = FlightServiceServer::new(service);
tokio::spawn(
create_grpc_server()
.add_service(server)
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
listener,
)),
);
tokio::spawn(execution_loop::poll_loop(scheduler, executor, codec));
Ok(())
}