use std::{net::SocketAddr, sync::Arc};
use arc_swap::ArcSwapOption;
use axum::routing::get;
#[cfg(feature = "http-api-gzip")]
use tower_http::compression::CompressionLayer;
use log::{debug, error};
use tokio::{sync::mpsc, task::JoinHandle};
use crate::{ingress::{self, http_ng::IngressApi}, units::rib_unit::rib::Rib, webui::WebUI};
#[derive(Default)]
pub struct Api {
interfaces: Vec<SocketAddr>,
api_root: String,
pub store: Arc<ArcSwapOption<Rib>>,
ingress_register: Arc<ingress::Register>,
metrics: crate::metrics::Collection,
router: axum::Router<ApiState>,
signal_txs: Vec<mpsc::Sender<()>>,
serve_handles: Vec<JoinHandle<()>>,
}
#[derive(Clone)]
pub struct ApiState {
pub(crate) store: Arc<ArcSwapOption<Rib>>,
pub(crate) ingress_register: Arc<ingress::Register>,
pub(crate) metrics: crate::metrics::Collection,
}
impl Api {
pub fn new(
interfaces: Vec<SocketAddr>,
ingress_register: Arc<ingress::Register>,
metrics: crate::metrics::Collection,
) -> Self {
let state = ApiState {
store: Default::default(),
ingress_register: ingress_register.clone(),
metrics: metrics.clone(),
};
let router = axum::Router::<ApiState>::new()
.route("/metrics", get(Self::metrics))
.with_state(state)
;
let mut res = Self {
api_root: "".into(),
store: Default::default(),
interfaces,
ingress_register,
metrics,
router,
signal_txs: vec![],
serve_handles: vec![],
};
eprintln!("calling WebUI::register_routes");
WebUI::register_routes(&mut res);
eprintln!("calling WebUI::register_routes done");
res.api_root = "/api/v1".into();
IngressApi::register_routes(&mut res);
res
}
async fn metrics(state: axum::extract::State<ApiState>) -> Result<String, String> {
Ok(state.metrics.assemble(crate::metrics::OutputFormat::Prometheus))
}
pub fn cloned_api_state(&self) -> ApiState {
debug!("cloned_api_state(), store is_some: {:?}", self.store.load().is_some());
ApiState {
store: self.store.clone(),
ingress_register: self.ingress_register.clone(),
metrics: self.metrics.clone(),
}
}
pub fn set_rib(&mut self, rib: Arc<Rib>) {
if self.store.swap(Some(rib)).is_some() {
debug!("http_ng set_rib(): Rib already set")
}
}
pub fn set_interfaces(&mut self, interfaces: impl IntoIterator<Item=SocketAddr>) { self.interfaces = interfaces.into_iter().collect();
}
pub fn add_get<H, T>(&mut self, path: impl AsRef<str>, handler: H)
where
H: axum::handler::Handler<T, ApiState>,
T: 'static,
{
debug!("add_get for {}", path.as_ref());
self.router = self.router.clone()
.route(&format!("{}{}", self.api_root, path.as_ref()), get(handler))
.with_state(
self.cloned_api_state()
);
}
pub fn start(&mut self) {
self.signal_txs = vec![];
self.serve_handles = vec![];
for interface in self.interfaces.clone() {
let (signal_tx, signal_rx) = mpsc::channel::<()>(1);
self.signal_txs.push(signal_tx);
debug!("starting Api on interface {interface}");
let mut app = self.router.clone().with_state(
self.cloned_api_state()
);
#[cfg(feature = "http-api-gzip")]
{
app = app.layer(CompressionLayer::new());
}
let h = tokio::spawn(async move {
let listener = match tokio::net::TcpListener::bind(interface).await {
Ok(listener) => listener,
Err(e) => {
error!("Could not bind on {}: {}", interface, e);
return;
}
};
let _ = axum::serve(listener, app)
.with_graceful_shutdown(Self::shutdown(signal_rx))
.await;
});
self.serve_handles.push(h);
}
}
async fn shutdown(mut rx: mpsc::Receiver<()>) {
rx.recv().await;
}
pub fn restart(&mut self) {
for tx in self.signal_txs.drain(..) {
let _ = tx.try_send(());
}
for h in self.serve_handles.drain(..) {
let handle = tokio::runtime::Handle::current();
tokio::task::block_in_place(move || {
h.is_finished();
let _ = handle.block_on(h);
});
}
self.start();
}
}
pub enum ApiError {
BadRequest(String),
InternalServerError(String),
}
impl axum::response::IntoResponse for ApiError {
fn into_response(self) -> axum::response::Response {
debug!("in into_response()");
fn to_json(msg: String) -> String {
debug!("in to_json() in into_response()");
serde_json::json!({
"data": None::<()>,
"error": msg
}).to_string()
}
(
[("content-type", "application/json")],
match self {
ApiError::BadRequest(msg) => {
(axum::http::StatusCode::BAD_REQUEST, to_json(msg))
}
ApiError::InternalServerError(msg) => {
(axum::http::StatusCode::INTERNAL_SERVER_ERROR, to_json(msg))
}
}
).into_response()
}
}