datafusion_server/
lib.rs

1#![warn(clippy::pedantic)]
2
3// datafusion-server - Arrow and Large Datasets Web Server
4// Sasaki, Naoki <nsasaki@sal.co.jp> October 16, 2022
5//
6
7use std::future::IntoFuture;
8use std::sync::Arc;
9
10use context::session_manager::SessionContextManager;
11use log::Level;
12use plugin::plugin_manager::{PluginManager, PLUGIN_MANAGER};
13
14use crate::server::{interval_worker, signal_handler};
15use crate::settings::{Settings, LAZY_SETTINGS};
16use crate::statistics::{Statistics, LAZY_STATISTICS};
17
18mod context;
19mod data_source;
20mod plugin;
21mod request;
22mod response;
23mod server;
24pub mod settings;
25mod statistics;
26
27#[cfg(any(not(feature = "flight"), not(feature = "telemetry")))]
28type BoxedFuture =
29    std::pin::Pin<Box<dyn std::future::Future<Output = Result<(), anyhow::Error>> + Send>>;
30
31/// ## Errors
32/// Initializing errors belows:
33/// * Configuration
34/// * Statistics Manager
35/// * Logging System
36/// * DataFusion Session Manager
37/// * Python Plugin Manager (feature = "plugin" only)
38/// * HTTP socket binding
39/// * gRPC socket binding (feature = "flight" only)
40///
41/// ## Panics
42/// * Unknown errors
43#[tokio::main]
44pub async fn execute(settings: Settings) -> anyhow::Result<()> {
45    LAZY_SETTINGS
46        .set(settings.init_global_managers()?)
47        .map_err(|_| anyhow::anyhow!("Can not initialize configurations"))?;
48
49    LAZY_STATISTICS
50        .set(Statistics::new())
51        .map_err(|_| anyhow::anyhow!("Can not initialize statistics"))?;
52
53    simple_logger::init_with_level(Settings::global().log.level().unwrap_or(Level::Info))?;
54
55    let plugin_mgr = PluginManager::new()?;
56
57    PLUGIN_MANAGER
58        .set(plugin_mgr)
59        .map_err(|_| anyhow::anyhow!("Can not initialize plugin manager"))?;
60
61    let session_mgr = Arc::new(tokio::sync::Mutex::new(SessionContextManager::new()));
62
63    let (http_server, http_addr) =
64        server::http::create_server::<SessionContextManager>(session_mgr.clone()).await?;
65
66    #[cfg(feature = "flight")]
67    let (flight_server, flight_addr) =
68        server::flight::create_server::<SessionContextManager>(&session_mgr.clone())?;
69
70    #[cfg(feature = "telemetry")]
71    let (metrics_server, metrics_addr) = server::metrics::create_server().await?;
72
73    log::info!("datafusion-server v{} started", env!("CARGO_PKG_VERSION"));
74    log::info!("http service listening on {http_addr:?}");
75    #[cfg(feature = "flight")]
76    log::info!("flight gRPC service listening on {flight_addr:?}");
77    #[cfg(feature = "telemetry")]
78    log::info!("metrics service listening on {metrics_addr:?}");
79    log::debug!("with config: {}", Settings::global().debug());
80
81    let http_service =
82        http_server.with_graceful_shutdown(signal_handler::register_shutdown_signal());
83
84    #[cfg(feature = "flight")]
85    let flight_service = Some(Box::pin(
86        tonic::transport::Server::builder()
87            .add_service(flight_server)
88            .serve(flight_addr),
89    ));
90    #[cfg(not(feature = "flight"))]
91    let flight_service: Option<BoxedFuture> = None;
92
93    #[cfg(feature = "telemetry")]
94    let metrics_service = Some(metrics_server.into_future());
95    #[cfg(not(feature = "telemetry"))]
96    let metrics_service: Option<BoxedFuture> = None;
97
98    tokio::select! {
99        http_result = http_service.into_future() => if let Err(e) = http_result {
100            log::error!("Can not initialize http server: {e:?}");
101            return Err(anyhow::anyhow!("http server initialization error: {:?}", e));
102        },
103        flight_result = async {
104            if let Some(future) = flight_service {
105                future.await
106            } else {
107                futures::future::pending().await
108            }
109        } => if let Err(e) = flight_result {
110            log::error!("Can not initialize flight gRPC server: {e:?}");
111            return Err(anyhow::anyhow!("flight server initialization error: {:?}", e));
112        },
113        metrics_result = async {
114            if let Some(future) = metrics_service {
115                future.await
116            } else {
117                futures::future::pending().await
118            }
119        } => if let Err(e) = metrics_result {
120            log::error!("Can not initialize metrics server: {e:?}");
121            return Err(anyhow::anyhow!("metrics server initialization error: {e:?}"));
122        },
123        () = interval_worker::cleanup_and_update_metrics(session_mgr) => {},
124    }
125
126    log::info!("Server terminated");
127
128    Ok(())
129}