mod router;
mod tpch;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use crate::{
args::{Command, DftArgs},
config::AppConfig,
db::register_db,
execution::AppExecution,
};
use axum::Router;
use color_eyre::Result;
use datafusion_app::{
config::merge_configs, extensions::DftSessionStateBuilder, local::ExecutionContext,
};
use router::create_router;
use tokio::{net::TcpListener, signal};
use tracing::{debug, info};
#[cfg(feature = "flightsql")]
use {
datafusion_app::{
config::{AuthConfig, FlightSQLConfig},
flightsql::FlightSQLContext,
},
tracing::error,
};
use super::try_start_metrics_server;
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
}
pub struct HttpApp {
listener: TcpListener,
router: Router,
}
impl HttpApp {
pub async fn try_new(
execution: AppExecution,
config: AppConfig,
addr: SocketAddr,
metrics_addr: SocketAddr,
) -> Result<Self> {
info!("listening to HTTP on {addr}");
let listener = TcpListener::bind(addr).await.unwrap();
let router = create_router(execution, config.http_server);
try_start_metrics_server(metrics_addr)?;
let app = Self { listener, router };
Ok(app)
}
pub async fn run(self) {
match axum::serve(self.listener, self.router)
.with_graceful_shutdown(shutdown_signal())
.await
{
Ok(_) => {
info!("Shutting down app")
}
Err(_) => {
panic!("Error serving HTTP app")
}
}
}
}
pub async fn try_run(cli: DftArgs, config: AppConfig) -> Result<()> {
let merged_exec_config =
merge_configs(config.shared.clone(), config.http_server.execution.clone());
let session_state_builder = DftSessionStateBuilder::try_new(Some(merged_exec_config.clone()))?
.with_extensions()
.await?;
let session_state = session_state_builder.build()?;
let execution_ctx = ExecutionContext::try_new(
&merged_exec_config,
session_state,
crate::APP_NAME,
env!("CARGO_PKG_VERSION"),
)?;
if cli.run_ddl {
execution_ctx.execute_ddl().await;
}
#[allow(unused_mut)]
let mut app_execution = AppExecution::new(execution_ctx);
#[cfg(feature = "flightsql")]
{
info!("Setting up FlightSQLContext");
let auth = AuthConfig {
basic_auth: config.flightsql_client.auth.basic_auth.clone(),
bearer_token: config.flightsql_client.auth.bearer_token.clone(),
};
let flightsql_cfg = FlightSQLConfig::new(
config.flightsql_client.connection_url.clone(),
config.flightsql_client.benchmark_iterations,
auth,
config.flightsql_client.headers.clone(),
);
let flightsql_context = FlightSQLContext::new(flightsql_cfg.clone());
if let Err(e) = flightsql_context
.create_client(
Some(flightsql_cfg.connection_url),
Some(flightsql_cfg.headers),
)
.await
{
error!("{}", e.to_string())
} else {
app_execution.with_flightsql_ctx(flightsql_context);
}
}
debug!("Created AppExecution: {app_execution:?}");
let (addr, metrics_addr) = if let Some(cmd) = cli.command.clone() {
match cmd {
Command::ServeHttp {
addr: Some(addr),
metrics_addr: Some(metrics_addr),
..
} => (addr, metrics_addr),
Command::ServeHttp {
addr: Some(addr),
metrics_addr: None,
..
} => (addr, config.http_server.server_metrics_addr),
Command::ServeHttp {
addr: None,
metrics_addr: Some(metrics_addr),
..
} => (
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
metrics_addr,
),
_ => (
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
config.http_server.server_metrics_addr,
),
}
} else {
(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
config.http_server.server_metrics_addr,
)
};
register_db(app_execution.session_ctx(), &config.db).await?;
let app = HttpApp::try_new(app_execution, config.clone(), addr, metrics_addr).await?;
app.run().await;
Ok(())
}