1use std::{future::Future, net::SocketAddr, path::PathBuf};
2
3use axum::{
4 Router,
5 http::{StatusCode, header},
6 middleware,
7 response::IntoResponse,
8 routing::{get, post},
9};
10use detritus_protocol::otlp::logs::LogsServiceServer;
11use tokio::net::TcpListener;
12use tonic::service::Routes;
13use tower::ServiceBuilder;
14use tower_http::{
15 compression::CompressionLayer, decompression::RequestDecompressionLayer,
16 limit::RequestBodyLimitLayer, trace::TraceLayer,
17};
18
19use crate::{
20 auth::{AuthState, TokenStore, auth_middleware},
21 crashes::crashes_handler,
22 janitor::{RetentionConfig, spawn_janitor},
23 logs::{LogWriterPool, LogsHandler},
24 metrics::Metrics,
25 rate_limit::{RateLimitConfig, RateLimiter},
26 schemas::SchemaRegistry,
27 storage::StoragePaths,
28};
29
30#[derive(Clone)]
32pub struct ServerConfig {
33 pub bind: SocketAddr,
35 pub data_dir: PathBuf,
37 pub max_dump_bytes: u64,
39 pub token_store: TokenStore,
41 pub rate_limit: RateLimitConfig,
43 pub retention: RetentionConfig,
45 pub schema_registry: SchemaRegistry,
48}
49
50#[derive(Clone)]
51pub(crate) struct AppState {
52 pub(crate) storage: StoragePaths,
53 pub(crate) max_dump_bytes: u64,
54 pub(crate) rate_limiter: RateLimiter,
55 pub(crate) metrics: Metrics,
56 pub(crate) schema_registry: SchemaRegistry,
57}
58
59pub async fn serve(config: ServerConfig) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
66 let listener = TcpListener::bind(config.bind).await?;
67 serve_with_shutdown(listener, config, shutdown_signal()).await
68}
69
70pub async fn serve_with_shutdown(
104 listener: TcpListener,
105 config: ServerConfig,
106 shutdown: impl Future<Output = ()> + Send + 'static,
107) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
108 let storage = StoragePaths::new(config.data_dir);
109 storage.prepare().await?;
110 let metrics = Metrics::new()?;
111 let rate_limiter = RateLimiter::new(config.rate_limit);
112 let writers = LogWriterPool::new(storage.clone());
113 let janitor = spawn_janitor(storage.clone(), config.retention, metrics.clone());
114 let app = app(
115 storage,
116 writers.clone(),
117 config.max_dump_bytes,
118 config.token_store,
119 rate_limiter,
120 metrics,
121 config.schema_registry,
122 );
123 let addr = listener.local_addr()?;
124 tracing::info!(%addr, "observability server listening");
125
126 let result = axum::serve(listener, app)
127 .with_graceful_shutdown(shutdown)
128 .await;
129 janitor.abort();
130 writers.shutdown().await;
131 result?;
132 Ok(())
133}
134
135fn app(
136 storage: StoragePaths,
137 writers: LogWriterPool,
138 max_dump_bytes: u64,
139 token_store: TokenStore,
140 rate_limiter: RateLimiter,
141 metrics: Metrics,
142 schema_registry: SchemaRegistry,
143) -> Router {
144 let state = AppState {
145 storage,
146 max_dump_bytes,
147 rate_limiter,
148 metrics: metrics.clone(),
149 schema_registry,
150 };
151 let grpc = Routes::new(LogsServiceServer::new(LogsHandler::new(
152 writers,
153 state.rate_limiter.clone(),
154 state.metrics.clone(),
155 state.schema_registry.clone(),
156 )))
157 .into_axum_router();
158 let http = Router::new()
159 .route("/v1/crashes", post(crashes_handler))
160 .route("/healthz", get(healthz))
161 .route("/metrics", get(render_metrics))
162 .with_state(state);
163 let auth_state = AuthState {
164 token_store,
165 metrics,
166 };
167
168 http.merge(grpc)
169 .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
170 .layer(
171 ServiceBuilder::new()
172 .layer(TraceLayer::new_for_http())
173 .layer(RequestBodyLimitLayer::new(150 * 1024 * 1024))
174 .layer(RequestDecompressionLayer::new())
175 .layer(CompressionLayer::new()),
176 )
177}
178
179async fn healthz() -> StatusCode {
180 StatusCode::NO_CONTENT
181}
182
183async fn render_metrics(
184 axum::extract::State(state): axum::extract::State<AppState>,
185) -> impl IntoResponse {
186 match state.metrics.render() {
187 Ok(body) => (
188 [(
189 header::CONTENT_TYPE,
190 "application/openmetrics-text; version=1.0.0",
191 )],
192 body,
193 )
194 .into_response(),
195 Err(error) => (
196 StatusCode::INTERNAL_SERVER_ERROR,
197 format!("failed to render metrics: {error}"),
198 )
199 .into_response(),
200 }
201}
202
203async fn shutdown_signal() {
204 if let Err(error) = tokio::signal::ctrl_c().await {
205 tracing::error!(%error, "failed to install ctrl-c handler");
206 }
207}