event_notification/
producer.rs

1use crate::Error;
2use crate::Event;
3use async_trait::async_trait;
4
5/// event producer characteristics
6#[allow(dead_code)]
7#[async_trait]
8pub trait EventProducer: Send + Sync {
9    /// start producer services
10    async fn start(&self) -> Result<(), Error>;
11    /// stop producer services
12    async fn stop(&self) -> Result<(), Error>;
13    /// send a single event
14    async fn send_event(&self, event: Event) -> Result<(), Error>;
15}
16
17#[cfg(feature = "http-producer")]
18pub mod http {
19    use super::*;
20    use axum::{Json, Router, routing::post};
21    use std::sync::Arc;
22    use tokio::sync::mpsc;
23
24    #[derive(Clone)]
25    pub struct HttpProducer {
26        tx: mpsc::Sender<Event>,
27        port: u16,
28        shutdown: Arc<tokio::sync::Notify>,
29    }
30
31    impl HttpProducer {
32        pub fn new(tx: mpsc::Sender<Event>, port: u16) -> Self {
33            Self {
34                tx,
35                port,
36                shutdown: Arc::new(tokio::sync::Notify::new()),
37            }
38        }
39    }
40
41    #[async_trait]
42    impl EventProducer for HttpProducer {
43        async fn start(&self) -> Result<(), Error> {
44            let producer = self.clone();
45            let app = Router::new().route(
46                "/event",
47                post(move |event| {
48                    let prod = producer.clone();
49                    async move { handle_event(event, prod).await }
50                }),
51            );
52
53            let addr = format!("0.0.0.0:{}", self.port);
54            let listener = tokio::net::TcpListener::bind(&addr).await?;
55
56            let shutdown = self.shutdown.clone();
57            tokio::select! {
58                result = axum::serve(listener, app) => {
59                    result?;
60                    Ok(())
61                }
62                _ = shutdown.notified() => Ok(())
63            }
64        }
65
66        async fn stop(&self) -> Result<(), Error> {
67            self.shutdown.notify_one();
68            Ok(())
69        }
70
71        async fn send_event(&self, event: Event) -> Result<(), Error> {
72            self.tx
73                .send(event)
74                .await
75                .map_err(|e| Error::ChannelSend(Box::new(e)))?;
76            Ok(())
77        }
78    }
79
80    async fn handle_event(
81        Json(event): Json<Event>,
82        producer: HttpProducer,
83    ) -> Result<(), axum::http::StatusCode> {
84        producer
85            .send_event(event)
86            .await
87            .map_err(|_| axum::http::StatusCode::INTERNAL_SERVER_ERROR)
88    }
89}