kapot_executor/
standalone.rs1use crate::metrics::LoggingMetricsCollector;
19use crate::{execution_loop, executor::Executor, flight_service::KapotFlightService};
20use arrow_flight::flight_service_server::FlightServiceServer;
21use kapot_core::object_store_registry::KapotObjectStoreRegistry;
22use kapot_core::{
23 error::Result,
24 serde::protobuf::executor_registration::OptionalHost,
25 serde::protobuf::{scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration},
26 serde::scheduler::ExecutorSpecification,
27 serde::KapotCodec,
28 utils::create_grpc_server,
29 KAPOT_VERSION,
30};
31use datafusion::execution::runtime_env::RuntimeEnvBuilder;
32use datafusion_proto::logical_plan::AsLogicalPlan;
33use datafusion_proto::physical_plan::AsExecutionPlan;
34use log::info;
35use std::sync::Arc;
36use tempfile::TempDir;
37use tokio::net::TcpListener;
38use tonic::transport::Channel;
39use uuid::Uuid;
40
41pub async fn new_standalone_executor<
42 T: 'static + AsLogicalPlan,
43 U: 'static + AsExecutionPlan,
44>(
45 scheduler: SchedulerGrpcClient<Channel>,
46 concurrent_tasks: usize,
47 codec: KapotCodec<T, U>,
48) -> Result<()> {
49 let listener = TcpListener::bind("0.0.0.0:0").await?;
51 let addr = listener.local_addr()?;
52 info!(
53 "kapot v{} Rust Executor listening on {:?}",
54 KAPOT_VERSION, addr
55 );
56
57 let executor_meta = ExecutorRegistration {
58 id: Uuid::new_v4().to_string(), optional_host: Some(OptionalHost::Host("0.0.0.0".to_string())),
60 port: addr.port() as u32,
61 grpc_port: 50020,
63 specification: Some(
64 ExecutorSpecification {
65 task_slots: concurrent_tasks as u32,
66 }
67 .into(),
68 ),
69 };
70 let work_dir = TempDir::new()?
71 .into_path()
72 .into_os_string()
73 .into_string()
74 .unwrap();
75 info!("work_dir: {}", work_dir);
76
77 let runtime = RuntimeEnvBuilder::new()
78 .with_temp_file_path(work_dir.clone())
79 .with_object_store_registry(Arc::new(KapotObjectStoreRegistry::new()))
80 .build()?;
81
82 let executor = Arc::new(Executor::new(
83 executor_meta,
84 &work_dir,
85 Arc::new(runtime),
86 None,
87 Arc::new(LoggingMetricsCollector::default()),
88 concurrent_tasks,
89 None,
90 ));
91
92 let service = KapotFlightService::new();
93 let server = FlightServiceServer::new(service);
94 tokio::spawn(
95 create_grpc_server()
96 .add_service(server)
97 .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(
98 listener,
99 )),
100 );
101
102 tokio::spawn(execution_loop::poll_loop(scheduler, executor, codec));
103 Ok(())
104}