Skip to main content

detritus_server/
server.rs

1use std::{future::Future, net::SocketAddr, path::PathBuf};
2
3use axum::{
4    Router,
5    http::{StatusCode, header},
6    middleware,
7    response::IntoResponse,
8    routing::{get, post},
9};
10use detritus_protocol::otlp::logs::LogsServiceServer;
11use tokio::net::TcpListener;
12use tonic::service::Routes;
13use tower::ServiceBuilder;
14use tower_http::{
15    compression::CompressionLayer, decompression::RequestDecompressionLayer,
16    limit::RequestBodyLimitLayer, trace::TraceLayer,
17};
18
19use crate::{
20    auth::{AuthState, TokenStore, auth_middleware},
21    crashes::crashes_handler,
22    janitor::{RetentionConfig, spawn_janitor},
23    logs::{LogWriterPool, LogsHandler},
24    metrics::Metrics,
25    rate_limit::{RateLimitConfig, RateLimiter},
26    schemas::SchemaRegistry,
27    storage::StoragePaths,
28};
29
30/// Runtime configuration for an embedded Detritus server.
31#[derive(Clone)]
32pub struct ServerConfig {
33    /// Socket address to bind when using [`serve`].
34    pub bind: SocketAddr,
35    /// Root directory for logs, crash blobs, indexes, and temporary files.
36    pub data_dir: PathBuf,
37    /// Maximum accepted crash dump or attachment size in bytes.
38    pub max_dump_bytes: u64,
39    /// Bearer-token store used by HTTP and gRPC authentication.
40    pub token_store: TokenStore,
41    /// Per-source and per-token rate limit configuration.
42    pub rate_limit: RateLimitConfig,
43    /// Retention policy for logs, crash indexes, and unreferenced blobs.
44    pub retention: RetentionConfig,
45    /// Tenant JSON Schema registry; use [`SchemaRegistry::empty`] to disable
46    /// schema validation for all tenants.
47    pub schema_registry: SchemaRegistry,
48}
49
50#[derive(Clone)]
51pub(crate) struct AppState {
52    pub(crate) storage: StoragePaths,
53    pub(crate) max_dump_bytes: u64,
54    pub(crate) rate_limiter: RateLimiter,
55    pub(crate) metrics: Metrics,
56    pub(crate) schema_registry: SchemaRegistry,
57}
58
59/// Runs a Detritus server until the process receives Ctrl-C.
60///
61/// # Errors
62///
63/// Returns an error if binding the listener, preparing storage, initializing
64/// metrics, or serving requests fails.
65pub async fn serve(config: ServerConfig) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
66    let listener = TcpListener::bind(config.bind).await?;
67    serve_with_shutdown(listener, config, shutdown_signal()).await
68}
69
70/// Runs a Detritus server on an existing listener until `shutdown` resolves.
71///
72/// # Errors
73///
74/// Returns an error if storage preparation, metrics initialization, listener
75/// inspection, or the HTTP/gRPC server fails.
76///
77/// # Examples
78///
79/// ```no_run
80/// use detritus_server::{
81///     RateLimitConfig, RetentionConfig, SchemaRegistry, ServerConfig, TestToken, TokenStore,
82///     serve_with_shutdown,
83/// };
84/// use tokio::net::TcpListener;
85///
86/// # async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
87/// let listener = TcpListener::bind("127.0.0.1:0").await?;
88/// let bind = listener.local_addr()?;
89/// let config = ServerConfig {
90///     bind,
91///     data_dir: std::env::temp_dir().join("detritus"),
92///     max_dump_bytes: 100 * 1024 * 1024,
93///     token_store: TokenStore::for_tests(Vec::<TestToken>::new()),
94///     rate_limit: RateLimitConfig::default(),
95///     retention: RetentionConfig::default(),
96///     schema_registry: SchemaRegistry::empty(),
97/// };
98///
99/// serve_with_shutdown(listener, config, async {}).await?;
100/// # Ok(())
101/// # }
102/// ```
103pub async fn serve_with_shutdown(
104    listener: TcpListener,
105    config: ServerConfig,
106    shutdown: impl Future<Output = ()> + Send + 'static,
107) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
108    let storage = StoragePaths::new(config.data_dir);
109    storage.prepare().await?;
110    let metrics = Metrics::new()?;
111    let rate_limiter = RateLimiter::new(config.rate_limit);
112    let writers = LogWriterPool::new(storage.clone());
113    let janitor = spawn_janitor(storage.clone(), config.retention, metrics.clone());
114    let app = app(
115        storage,
116        writers.clone(),
117        config.max_dump_bytes,
118        config.token_store,
119        rate_limiter,
120        metrics,
121        config.schema_registry,
122    );
123    let addr = listener.local_addr()?;
124    tracing::info!(%addr, "observability server listening");
125
126    let result = axum::serve(listener, app)
127        .with_graceful_shutdown(shutdown)
128        .await;
129    janitor.abort();
130    writers.shutdown().await;
131    result?;
132    Ok(())
133}
134
135fn app(
136    storage: StoragePaths,
137    writers: LogWriterPool,
138    max_dump_bytes: u64,
139    token_store: TokenStore,
140    rate_limiter: RateLimiter,
141    metrics: Metrics,
142    schema_registry: SchemaRegistry,
143) -> Router {
144    let state = AppState {
145        storage,
146        max_dump_bytes,
147        rate_limiter,
148        metrics: metrics.clone(),
149        schema_registry,
150    };
151    let grpc = Routes::new(LogsServiceServer::new(LogsHandler::new(
152        writers,
153        state.rate_limiter.clone(),
154        state.metrics.clone(),
155        state.schema_registry.clone(),
156    )))
157    .into_axum_router();
158    let http = Router::new()
159        .route("/v1/crashes", post(crashes_handler))
160        .route("/healthz", get(healthz))
161        .route("/metrics", get(render_metrics))
162        .with_state(state);
163    let auth_state = AuthState {
164        token_store,
165        metrics,
166    };
167
168    http.merge(grpc)
169        .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
170        .layer(
171            ServiceBuilder::new()
172                .layer(TraceLayer::new_for_http())
173                .layer(RequestBodyLimitLayer::new(150 * 1024 * 1024))
174                .layer(RequestDecompressionLayer::new())
175                .layer(CompressionLayer::new()),
176        )
177}
178
179async fn healthz() -> StatusCode {
180    StatusCode::NO_CONTENT
181}
182
183async fn render_metrics(
184    axum::extract::State(state): axum::extract::State<AppState>,
185) -> impl IntoResponse {
186    match state.metrics.render() {
187        Ok(body) => (
188            [(
189                header::CONTENT_TYPE,
190                "application/openmetrics-text; version=1.0.0",
191            )],
192            body,
193        )
194            .into_response(),
195        Err(error) => (
196            StatusCode::INTERNAL_SERVER_ERROR,
197            format!("failed to render metrics: {error}"),
198        )
199            .into_response(),
200    }
201}
202
203async fn shutdown_signal() {
204    if let Err(error) = tokio::signal::ctrl_c().await {
205        tracing::error!(%error, "failed to install ctrl-c handler");
206    }
207}