use std::{net::SocketAddr, pin::Pin};
use anyhow::Context;
use notmad::Component;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Response;
use uuid::Uuid;
use crate::{
services::{consumers::ConsumersState, ingest::IngestState, staging::StagingEvent},
state::SharedState,
};
include!("gen/nodata.v1.rs");
#[derive(Clone)]
pub struct GrpcServer {
state: SharedState,
host: SocketAddr,
counter: prometheus::Counter,
}
impl GrpcServer {
pub fn new(state: &SharedState, host: SocketAddr) -> Self {
let counter = prometheus::Counter::with_opts(prometheus::Opts::new(
"messages_pr_second",
"how many messages are ingested pr second",
))
.unwrap();
state
.metrics_registry
.register(Box::new(counter.clone()))
.expect("to be able to register metrics");
Self {
state: state.clone(),
host,
counter,
}
}
}
type ResponseStream =
Pin<Box<dyn tokio_stream::Stream<Item = Result<SubscribeResponse, tonic::Status>> + Send>>;
#[tonic::async_trait]
impl no_data_service_server::NoDataService for GrpcServer {
async fn publish_event(
&self,
request: tonic::Request<PublishEventRequest>,
) -> std::result::Result<tonic::Response<PublishEventResponse>, tonic::Status> {
let req = request.into_inner();
tracing::debug!(
topic = req.topic,
value = std::str::from_utf8(&req.value).ok(),
"handling event"
);
self.counter.inc();
self.state.ingest().publish(req).await.map_err(|e| {
let caused_by = e
.chain()
.map(|e| e.to_string())
.collect::<Vec<String>>()
.join(", ");
tracing::warn!("failed to handle ingest of data: {}: {}", e, caused_by);
tonic::Status::internal(e.to_string())
})?;
Ok(tonic::Response::new(PublishEventResponse {}))
}
async fn get_topics(
&self,
_request: tonic::Request<GetTopicsRequest>,
) -> std::result::Result<tonic::Response<GetTopicsResponse>, tonic::Status> {
let topics = self.state.staging.get_topics().await.map_err(|e| {
tracing::warn!(error = e.to_string(), "failed to get topics");
tonic::Status::internal(e.to_string())
})?;
Ok(tonic::Response::new(GetTopicsResponse { topics }))
}
type SubscribeStream = ResponseStream;
async fn subscribe(
&self,
request: tonic::Request<self::SubscribeRequest>,
) -> std::result::Result<tonic::Response<Self::SubscribeStream>, tonic::Status> {
let req = request.into_inner();
let id = Uuid::new_v4().to_string();
let index = Uuid::new_v4().to_string();
let consumer = self
.state
.consumers()
.add_consumer(&id, &index, req.topic)
.await
.map_err(|e| {
tracing::warn!(error = e.to_string(), "failed to add consumer");
tonic::Status::internal(e.to_string())
})?
.unwrap();
let (tx, rx) = tokio::sync::mpsc::channel(128);
tokio::spawn(async move {
let mut event_stream = consumer.rx.lock().await;
while let Some(msg) = event_stream.recv().await {
if let Err(e) = tx
.send(Ok(SubscribeResponse {
published: Some(chrono_to_prost_timestamp(msg.published)),
value: msg.value,
}))
.await
{
tracing::warn!(error = e.to_string(), "failed to send event");
};
}
});
let stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream) as Self::SubscribeStream))
}
}
impl From<PublishEventRequest> for StagingEvent {
fn from(value: PublishEventRequest) -> Self {
Self {
topic: value.topic,
published: chrono::Utc::now(),
value: value.value,
}
}
}
fn prost_timestamp_to_chrono(ts: prost_types::Timestamp) -> Option<chrono::DateTime<chrono::Utc>> {
chrono::DateTime::<chrono::Utc>::from_timestamp(ts.seconds, ts.nanos as u32)
}
fn chrono_to_prost_timestamp(dt: chrono::DateTime<chrono::Utc>) -> prost_types::Timestamp {
prost_types::Timestamp {
seconds: dt.timestamp(),
nanos: dt.timestamp_subsec_nanos() as i32,
}
}
#[axum::async_trait]
impl Component for GrpcServer {
fn name(&self) -> Option<String> {
Some("grpc_server".into())
}
async fn run(
&self,
_cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), notmad::MadError> {
tracing::info!("grpc listening on: {}", self.host);
tonic::transport::Server::builder()
.add_service(no_data_service_server::NoDataServiceServer::new(
self.clone(),
))
.serve(self.host)
.await
.context("grpc server failed")
.map_err(notmad::MadError::Inner)?;
Ok(())
}
}