1#![warn(clippy::pedantic)]
2
3use std::future::IntoFuture;
8use std::sync::Arc;
9
10use context::session_manager::SessionContextManager;
11use log::Level;
12use plugin::plugin_manager::{PluginManager, PLUGIN_MANAGER};
13
14use crate::server::{interval_worker, signal_handler};
15use crate::settings::{Settings, LAZY_SETTINGS};
16use crate::statistics::{Statistics, LAZY_STATISTICS};
17
18mod context;
19mod data_source;
20mod plugin;
21mod request;
22mod response;
23mod server;
24pub mod settings;
25mod statistics;
26
27#[cfg(any(not(feature = "flight"), not(feature = "telemetry")))]
28type BoxedFuture =
29 std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), anyhow::Error>> + Send>>;
30
31#[tokio::main]
44pub async fn execute(settings: Settings) -> anyhow::Result<()> {
45 LAZY_SETTINGS
46 .set(settings.init_global_managers()?)
47 .map_err(|_| anyhow::anyhow!("Can not initialize configurations"))?;
48
49 LAZY_STATISTICS
50 .set(Statistics::new())
51 .map_err(|_| anyhow::anyhow!("Can not initialize statistics"))?;
52
53 simple_logger::init_with_level(Settings::global().log.level().unwrap_or(Level::Info))?;
54
55 let plugin_mgr = PluginManager::new()?;
56
57 PLUGIN_MANAGER
58 .set(plugin_mgr)
59 .map_err(|_| anyhow::anyhow!("Can not initialize plugin manager"))?;
60
61 let session_mgr = Arc::new(tokio::sync::Mutex::new(SessionContextManager::new()));
62
63 let (http_server, http_addr) =
64 server::http::create_server::<SessionContextManager>(session_mgr.clone()).await?;
65
66 #[cfg(feature = "flight")]
67 let (flight_server, flight_addr) =
68 server::flight::create_server::<SessionContextManager>(&session_mgr.clone())?;
69
70 #[cfg(feature = "telemetry")]
71 let (metrics_server, metrics_addr) = server::metrics::create_server().await?;
72
73 log::info!("datafusion-server v{} started", env!("CARGO_PKG_VERSION"));
74 log::info!("http service listening on {http_addr:?}");
75 #[cfg(feature = "flight")]
76 log::info!("flight gRPC service listening on {flight_addr:?}");
77 #[cfg(feature = "telemetry")]
78 log::info!("metrics service listening on {metrics_addr:?}");
79 log::debug!("with config: {}", Settings::global().debug());
80
81 let http_service =
82 http_server.with_graceful_shutdown(signal_handler::register_shutdown_signal());
83
84 #[cfg(feature = "flight")]
85 let flight_service = Some(Box::pin(
86 tonic::transport::Server::builder()
87 .add_service(flight_server)
88 .serve(flight_addr),
89 ));
90 #[cfg(not(feature = "flight"))]
91 let flight_service: Option<BoxedFuture> = None;
92
93 #[cfg(feature = "telemetry")]
94 let metrics_service = Some(metrics_server.into_future());
95 #[cfg(not(feature = "telemetry"))]
96 let metrics_service: Option<BoxedFuture> = None;
97
98 tokio::select! {
99 http_result = http_service.into_future() => if let Err(e) = http_result {
100 log::error!("Can not initialize http server: {e:?}");
101 return Err(anyhow::anyhow!("http server initialization error: {:?}", e));
102 },
103 flight_result = async {
104 if let Some(future) = flight_service {
105 future.await
106 } else {
107 futures::future::pending().await
108 }
109 } => if let Err(e) = flight_result {
110 log::error!("Can not initialize flight gRPC server: {e:?}");
111 return Err(anyhow::anyhow!("flight server initialization error: {:?}", e));
112 },
113 metrics_result = async {
114 if let Some(future) = metrics_service {
115 future.await
116 } else {
117 futures::future::pending().await
118 }
119 } => if let Err(e) = metrics_result {
120 log::error!("Can not initialize metrics server: {e:?}");
121 return Err(anyhow::anyhow!("metrics server initialization error: {e:?}"));
122 },
123 () = interval_worker::cleanup_and_update_metrics(session_mgr) => {},
124 }
125
126 log::info!("Server terminated");
127
128 Ok(())
129}