#![forbid(unsafe_code)]
#![deny(rust_2018_idioms)]
pub mod aggregator;
pub mod exporters;
pub mod routes;
pub mod spa;
pub mod ws;
use std::net::SocketAddr;
use std::sync::Arc;
use axum::Router;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use rakka_telemetry::TelemetryExtension;
#[derive(Clone, Debug, Default)]
pub enum DashboardMode {
#[default]
Local,
Cluster { peers: Vec<String> },
}
#[derive(Clone, Debug)]
pub struct DashboardConfig {
pub bind: SocketAddr,
pub mode: DashboardMode,
pub ws_channel_capacity: usize,
pub exporters: rakka_telemetry::exporters::config::ExportersConfig,
}
impl Default for DashboardConfig {
fn default() -> Self {
Self {
bind: "127.0.0.1:9100".parse().unwrap(),
mode: DashboardMode::Local,
ws_channel_capacity: 1024,
exporters: Default::default(),
}
}
}
#[derive(Clone)]
pub struct AppState {
pub telemetry: Arc<TelemetryExtension>,
pub mode: DashboardMode,
pub exporters: exporters::ExporterHandles,
}
impl AppState {
pub fn new(telemetry: Arc<TelemetryExtension>, mode: DashboardMode) -> Self {
Self { telemetry, mode, exporters: Default::default() }
}
}
pub struct DashboardHandle {
pub bound_addr: SocketAddr,
shutdown_tx: Option<oneshot::Sender<()>>,
join: Option<JoinHandle<()>>,
}
impl DashboardHandle {
pub async fn shutdown(mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
if let Some(j) = self.join.take() {
let _ = j.await;
}
}
}
pub struct DashboardServer {
telemetry: Arc<TelemetryExtension>,
config: DashboardConfig,
}
impl DashboardServer {
pub fn new(telemetry: Arc<TelemetryExtension>, config: DashboardConfig) -> Self {
Self { telemetry, config }
}
pub fn router(&self) -> Router {
let state = AppState::new(self.telemetry.clone(), self.config.mode.clone());
routes::build_router(state, self.config.ws_channel_capacity)
}
pub fn router_with_exporters(&self) -> Result<Router, ServerError> {
let handles = exporters::apply(&self.telemetry, &self.config.exporters)?;
let mut state = AppState::new(self.telemetry.clone(), self.config.mode.clone());
state.exporters = handles;
Ok(routes::build_router(state, self.config.ws_channel_capacity))
}
pub async fn start(self) -> Result<DashboardHandle, ServerError> {
let handles = exporters::apply(&self.telemetry, &self.config.exporters)?;
let mut state = AppState::new(self.telemetry.clone(), self.config.mode.clone());
state.exporters = handles;
let router = routes::build_router(state, self.config.ws_channel_capacity);
let listener = tokio::net::TcpListener::bind(self.config.bind).await.map_err(ServerError::Bind)?;
let bound = listener.local_addr().map_err(ServerError::Bind)?;
let (tx, rx) = oneshot::channel::<()>();
let join = tokio::spawn(async move {
let _ = axum::serve(listener, router.into_make_service())
.with_graceful_shutdown(async {
let _ = rx.await;
})
.await;
});
Ok(DashboardHandle { bound_addr: bound, shutdown_tx: Some(tx), join: Some(join) })
}
}
#[derive(Debug, thiserror::Error)]
pub enum ServerError {
#[error("failed to bind: {0}")]
Bind(std::io::Error),
#[error("exporter init failed: {0}")]
Exporter(String),
}