event_notification/
producer.rs1use crate::Error;
2use crate::Event;
3use async_trait::async_trait;
4
5#[allow(dead_code)]
7#[async_trait]
8pub trait EventProducer: Send + Sync {
9 async fn start(&self) -> Result<(), Error>;
11 async fn stop(&self) -> Result<(), Error>;
13 async fn send_event(&self, event: Event) -> Result<(), Error>;
15}
16
17#[cfg(feature = "http-producer")]
18pub mod http {
19 use super::*;
20 use axum::{Json, Router, routing::post};
21 use std::sync::Arc;
22 use tokio::sync::mpsc;
23
24 #[derive(Clone)]
25 pub struct HttpProducer {
26 tx: mpsc::Sender<Event>,
27 port: u16,
28 shutdown: Arc<tokio::sync::Notify>,
29 }
30
31 impl HttpProducer {
32 pub fn new(tx: mpsc::Sender<Event>, port: u16) -> Self {
33 Self {
34 tx,
35 port,
36 shutdown: Arc::new(tokio::sync::Notify::new()),
37 }
38 }
39 }
40
41 #[async_trait]
42 impl EventProducer for HttpProducer {
43 async fn start(&self) -> Result<(), Error> {
44 let producer = self.clone();
45 let app = Router::new().route(
46 "/event",
47 post(move |event| {
48 let prod = producer.clone();
49 async move { handle_event(event, prod).await }
50 }),
51 );
52
53 let addr = format!("0.0.0.0:{}", self.port);
54 let listener = tokio::net::TcpListener::bind(&addr).await?;
55
56 let shutdown = self.shutdown.clone();
57 tokio::select! {
58 result = axum::serve(listener, app) => {
59 result?;
60 Ok(())
61 }
62 _ = shutdown.notified() => Ok(())
63 }
64 }
65
66 async fn stop(&self) -> Result<(), Error> {
67 self.shutdown.notify_one();
68 Ok(())
69 }
70
71 async fn send_event(&self, event: Event) -> Result<(), Error> {
72 self.tx
73 .send(event)
74 .await
75 .map_err(|e| Error::ChannelSend(Box::new(e)))?;
76 Ok(())
77 }
78 }
79
80 async fn handle_event(
81 Json(event): Json<Event>,
82 producer: HttpProducer,
83 ) -> Result<(), axum::http::StatusCode> {
84 producer
85 .send_event(event)
86 .await
87 .map_err(|_| axum::http::StatusCode::INTERNAL_SERVER_ERROR)
88 }
89}