nodata 0.1.0

nodata is a kafka like message broker that is simple and easy to use, while relying on either local or s3 like data storage for consistency
use std::net::SocketAddr;

use anyhow::Context;
use axum::async_trait;
use axum::extract::MatchedPath;
use axum::extract::State;
use axum::http::Request;
use axum::routing::get;
use axum::Router;
use notmad::Component;
use notmad::MadError;
use prometheus::Encoder;
use prometheus::TextEncoder;
use tokio_util::sync::CancellationToken;
use tower_http::trace::TraceLayer;

use crate::state::SharedState;

pub struct HttpServer {
    state: SharedState,
    host: SocketAddr,
}

impl HttpServer {
    pub fn new(state: &SharedState, host: SocketAddr) -> Self {
        Self {
            state: state.clone(),
            host,
        }
    }
}

#[async_trait]
impl Component for HttpServer {
    async fn run(&self, _cancellation_token: CancellationToken) -> Result<(), notmad::MadError> {
        let app = Router::new()
            .route("/", get(root))
            .route("/metrics", get(metrics))
            .with_state(self.state.clone())
            .layer(
                TraceLayer::new_for_http().make_span_with(|request: &Request<_>| {
                    let matched_path = request
                        .extensions()
                        .get::<MatchedPath>()
                        .map(MatchedPath::as_str);

                    tracing::info_span!(
                        "http_request",
                        method = ?request.method(),
                        matched_path,
                        some_other_field = tracing::field::Empty,
                    )
                }),
            );

        tracing::info!("http: listening on {}", self.host);
        let listener = tokio::net::TcpListener::bind(self.host).await.unwrap();
        axum::serve(listener, app.into_make_service())
            .await
            .context("axum server stopped")
            .map_err(MadError::Inner)?;

        Ok(())
    }
}

async fn root() -> &'static str {
    "Hello, nodata!"
}

async fn metrics(State(state): State<SharedState>) -> String {
    let encoder = TextEncoder::new();
    let metrics = state.metrics_registry.gather();

    let mut buffer = Vec::new();
    encoder
        .encode(&metrics, &mut buffer)
        .expect("to be able to encode metrics");

    String::from_utf8(buffer).expect("to be able to encode from utf8")
}