nodata 0.1.0

nodata is a kafka like message broker that is simple and easy to use, while relying on either local or s3 like data storage for consistency
use std::{net::SocketAddr, pin::Pin};

use anyhow::Context;
use notmad::Component;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Response;
use uuid::Uuid;

use crate::{
    services::{consumers::ConsumersState, ingest::IngestState, staging::StagingEvent},
    state::SharedState,
};

include!("gen/nodata.v1.rs");

#[derive(Clone)]
pub struct GrpcServer {
    state: SharedState,
    host: SocketAddr,
    counter: prometheus::Counter,
}

impl GrpcServer {
    pub fn new(state: &SharedState, host: SocketAddr) -> Self {
        let counter = prometheus::Counter::with_opts(prometheus::Opts::new(
            "messages_pr_second",
            "how many messages are ingested pr second",
        ))
        .unwrap();

        state
            .metrics_registry
            .register(Box::new(counter.clone()))
            .expect("to be able to register metrics");

        Self {
            state: state.clone(),
            host,
            counter,
        }
    }
}

type ResponseStream =
    Pin<Box<dyn tokio_stream::Stream<Item = Result<SubscribeResponse, tonic::Status>> + Send>>;

#[tonic::async_trait]
impl no_data_service_server::NoDataService for GrpcServer {
    async fn publish_event(
        &self,
        request: tonic::Request<PublishEventRequest>,
    ) -> std::result::Result<tonic::Response<PublishEventResponse>, tonic::Status> {
        let req = request.into_inner();

        tracing::debug!(
            topic = req.topic,
            value = std::str::from_utf8(&req.value).ok(),
            "handling event"
        );

        self.counter.inc();

        self.state.ingest().publish(req).await.map_err(|e| {
            let caused_by = e
                .chain()
                .map(|e| e.to_string())
                .collect::<Vec<String>>()
                .join(", ");
            tracing::warn!("failed to handle ingest of data: {}: {}", e, caused_by);
            tonic::Status::internal(e.to_string())
        })?;

        Ok(tonic::Response::new(PublishEventResponse {}))
    }

    async fn get_topics(
        &self,
        _request: tonic::Request<GetTopicsRequest>,
    ) -> std::result::Result<tonic::Response<GetTopicsResponse>, tonic::Status> {
        let topics = self.state.staging.get_topics().await.map_err(|e| {
            tracing::warn!(error = e.to_string(), "failed to get topics");
            tonic::Status::internal(e.to_string())
        })?;

        Ok(tonic::Response::new(GetTopicsResponse { topics }))
    }

    type SubscribeStream = ResponseStream;

    async fn subscribe(
        &self,
        request: tonic::Request<self::SubscribeRequest>,
    ) -> std::result::Result<tonic::Response<Self::SubscribeStream>, tonic::Status> {
        let req = request.into_inner();

        let id = Uuid::new_v4().to_string();
        let index = Uuid::new_v4().to_string();

        let consumer = self
            .state
            .consumers()
            .add_consumer(&id, &index, req.topic)
            .await
            .map_err(|e| {
                tracing::warn!(error = e.to_string(), "failed to add consumer");
                tonic::Status::internal(e.to_string())
            })?
            .unwrap();

        let (tx, rx) = tokio::sync::mpsc::channel(128);
        tokio::spawn(async move {
            let mut event_stream = consumer.rx.lock().await;
            while let Some(msg) = event_stream.recv().await {
                if let Err(e) = tx
                    .send(Ok(SubscribeResponse {
                        published: Some(chrono_to_prost_timestamp(msg.published)),
                        value: msg.value,
                    }))
                    .await
                {
                    tracing::warn!(error = e.to_string(), "failed to send event");
                };
            }
        });

        let stream = ReceiverStream::new(rx);
        Ok(Response::new(Box::pin(stream) as Self::SubscribeStream))
    }
}

impl From<PublishEventRequest> for StagingEvent {
    fn from(value: PublishEventRequest) -> Self {
        Self {
            topic: value.topic,
            published: chrono::Utc::now(),
            value: value.value,
        }
    }
}

fn prost_timestamp_to_chrono(ts: prost_types::Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
    chrono::DateTime::<chrono::Utc>::from_timestamp(ts.seconds, ts.nanos as u32)
}

fn chrono_to_prost_timestamp(dt: chrono::DateTime<chrono::Utc>) -> prost_types::Timestamp {
    prost_types::Timestamp {
        seconds: dt.timestamp(),
        nanos: dt.timestamp_subsec_nanos() as i32,
    }
}

#[axum::async_trait]
impl Component for GrpcServer {
    fn name(&self) -> Option<String> {
        Some("grpc_server".into())
    }

    async fn run(
        &self,
        _cancellation_token: tokio_util::sync::CancellationToken,
    ) -> Result<(), notmad::MadError> {
        tracing::info!("grpc listening on: {}", self.host);

        tonic::transport::Server::builder()
            .add_service(no_data_service_server::NoDataServiceServer::new(
                self.clone(),
            ))
            .serve(self.host)
            .await
            .context("grpc server failed")
            .map_err(notmad::MadError::Inner)?;

        Ok(())
    }
}