holter/
lib.rs

1use axum::{
2    extract::State,
3    http::StatusCode,
4    response::IntoResponse,
5    routing::{get, IntoMakeService},
6    serve::WithGracefulShutdown,
7    Router,
8};
9use metrics_exporter_prometheus::{BuildError, PrometheusBuilder, PrometheusHandle};
10#[cfg(feature = "db")]
11use sqlx::{Database, Executor, IntoArguments, Pool};
12use std::{future::Future, net::SocketAddr};
13use tokio::net::TcpListener;
14
15#[cfg(feature = "api-metrics")]
16pub mod metrics_middleware;
17
18#[cfg(feature = "db")]
19#[derive(Clone)]
20pub struct HolterState<DB>
21where
22    DB: Database + Clone + Send + Sync + 'static,
23    for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
24    for<'q> DB::Arguments<'q>: IntoArguments<'q, DB>,
25{
26    db: Option<Pool<DB>>,
27    metric_handle: PrometheusHandle,
28}
29
30#[cfg(not(feature = "db"))]
31#[derive(Clone)]
32pub struct HolterState {
33    metric_handle: PrometheusHandle,
34}
35
36#[cfg(feature = "db")]
37pub struct HolterServer<DB>
38where
39    DB: Database + Clone + Send + Sync + 'static,
40    for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
41    for<'q> DB::Arguments<'q>: IntoArguments<'q, DB>,
42{
43    listen_addr: SocketAddr,
44    state: HolterState<DB>,
45}
46
47#[cfg(not(feature = "db"))]
48pub struct HolterServer {
49    listen_addr: SocketAddr,
50    state: HolterState,
51}
52
53#[cfg(feature = "db")]
54pub struct HolterServerBuilder<DB>
55where
56    DB: Database + Clone + Send + Sync + 'static,
57    for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
58    for<'q> DB::Arguments<'q>: IntoArguments<'q, DB>,
59{
60    listen_addr: SocketAddr,
61    db: Option<Pool<DB>>,
62}
63
64#[cfg(not(feature = "db"))]
65pub struct HolterServerBuilder {
66    listen_addr: SocketAddr,
67}
68
69#[cfg(feature = "db")]
70impl<DB> Default for HolterServerBuilder<DB>
71where
72    DB: Database + Clone + Send + Sync + 'static,
73    for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
74    for<'q> DB::Arguments<'q>: IntoArguments<'q, DB>,
75{
76    fn default() -> Self {
77        Self {
78            listen_addr: "127.0.0.1:9090".parse().unwrap(),
79            db: None,
80        }
81    }
82}
83
84#[cfg(not(feature = "db"))]
85impl Default for HolterServerBuilder {
86    fn default() -> Self {
87        Self {
88            listen_addr: "127.0.0.1:9090".parse().unwrap(),
89        }
90    }
91}
92
93#[cfg(feature = "db")]
94impl<DB> HolterServerBuilder<DB>
95where
96    DB: Database + Clone + Send + Sync + 'static,
97    for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
98    for<'q> DB::Arguments<'q>: IntoArguments<'q, DB>,
99{
100    pub fn new() -> Self {
101        Self::default()
102    }
103
104    pub fn add_listen_addr(mut self, listen_addr: SocketAddr) -> Self {
105        self.listen_addr = listen_addr;
106        self
107    }
108
109    pub fn add_db_connection(mut self, pool: Pool<DB>) -> Self {
110        self.db = Some(pool);
111        self
112    }
113
114    pub fn build(self) -> Result<HolterServer<DB>, BuildError> {
115        let metric_handle = PrometheusBuilder::new().install_recorder()?;
116        let state = HolterState {
117            db: self.db,
118            metric_handle,
119        };
120        Ok(HolterServer {
121            listen_addr: self.listen_addr,
122            state,
123        })
124    }
125}
126
127#[cfg(not(feature = "db"))]
128impl HolterServerBuilder {
129    pub fn new() -> Self {
130        Self::default()
131    }
132
133    pub fn add_listen_addr(mut self, listen_addr: SocketAddr) -> Self {
134        self.listen_addr = listen_addr;
135        self
136    }
137
138    pub fn build(self) -> Result<HolterServer, BuildError> {
139        let metric_handle = PrometheusBuilder::new().install_recorder()?;
140        let state = HolterState { metric_handle };
141        Ok(HolterServer {
142            listen_addr: self.listen_addr,
143            state,
144        })
145    }
146}
147
148#[cfg(feature = "db")]
149impl<DB> HolterServer<DB>
150where
151    DB: Database + Clone + Send + Sync + 'static,
152    for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
153    for<'q> DB::Arguments<'q>: IntoArguments<'q, DB>,
154{
155    pub async fn serve<L, M, S, F>(
156        self,
157        signal: F,
158    ) -> WithGracefulShutdown<TcpListener, IntoMakeService<Router>, Router, F>
159    where
160        F: Future<Output = ()> + Send + 'static,
161    {
162        let router = Router::new()
163            .route("/metrics", get(metrics_handler))
164            .route("/healthz", get(health_handler))
165            .with_state(self.state.clone());
166
167        axum::serve(
168            TcpListener::bind(&self.listen_addr)
169                .await
170                .expect("holter tcp bind error"),
171            router.into_make_service(),
172        )
173        .with_graceful_shutdown(signal)
174    }
175}
176
177#[cfg(not(feature = "db"))]
178impl HolterServer {
179    pub async fn serve<L, M, S, F>(
180        self,
181        signal: F,
182    ) -> WithGracefulShutdown<TcpListener, IntoMakeService<Router>, Router, F>
183    where
184        F: Future<Output = ()> + Send + 'static,
185    {
186        let router = Router::new()
187            .route("/metrics", get(metrics_handler))
188            .route("/healthz", get(health_handler))
189            .with_state(self.state.clone());
190
191        axum::serve(
192            TcpListener::bind(&self.listen_addr)
193                .await
194                .expect("holter tcp bind error"),
195            router.into_make_service(),
196        )
197        .with_graceful_shutdown(signal)
198    }
199}
200
201#[cfg(feature = "db")]
202pub async fn health_handler<DB>(State(data): State<HolterState<DB>>) -> impl IntoResponse
203where
204    DB: Database + Clone + Send + Sync + 'static,
205    for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
206    for<'q> DB::Arguments<'q>: IntoArguments<'q, DB>,
207{
208    if let Some(ref db) = data.db {
209        if sqlx::query("SELECT 1").execute(db).await.is_ok() {
210            return StatusCode::INTERNAL_SERVER_ERROR;
211        }
212    }
213    StatusCode::OK
214}
215
216#[cfg(not(feature = "db"))]
217pub async fn health_handler(_state: State<HolterState>) -> impl IntoResponse {
218    StatusCode::OK
219}
220
221#[cfg(feature = "db")]
222pub async fn metrics_handler<DB>(State(data): State<HolterState<DB>>) -> impl IntoResponse
223where
224    DB: Database + Clone + Send + Sync + 'static,
225    for<'c> &'c mut DB::Connection: Executor<'c, Database = DB>,
226    for<'q> DB::Arguments<'q>: IntoArguments<'q, DB>,
227{
228    data.metric_handle.render()
229}
230
231#[cfg(not(feature = "db"))]
232pub async fn metrics_handler(State(data): State<HolterState>) -> impl IntoResponse {
233    data.metric_handle.render()
234}