1use anyhow::{Context, Result};
17use axum::{
18 http::StatusCode,
19 response::{IntoResponse, Response},
20 routing::{get, post},
21 Json, Router,
22};
23
24pub mod config;
25pub mod types;
26
27pub use config::{
28 BatchConfig, EnvSource, FsConfig, LogFormat, Platform, RequestConfig, RuntimeConfig,
29 ServerConfig, StorageBackend, StorageConfig, ENV_PREFIX,
30};
31pub use otlp2records::InputFormat;
32pub use types::{Blake3Hash, MetricType, SignalKey, SignalType};
33
34mod batch;
35pub mod codec;
36
37use batch::{BatchConfig as BatcherConfig, BatchManager};
38use serde_json::json;
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::sync::Arc;
41use std::time::Duration;
42use tokio::signal;
43use tower_http::decompression::RequestDecompressionLayer;
44use tracing::{debug, error, info, warn};
45
46mod handlers;
47mod init;
48mod writer;
49
50pub mod connect;
51
52use handlers::{handle_logs, handle_metrics, handle_traces, health_check, ready_check};
53pub use init::init_tracing;
54use init::init_writer;
55
56#[derive(Clone)]
58pub(crate) struct AppState {
59 pub batcher: Option<Arc<BatchManager>>,
60 pub max_payload_bytes: usize,
61}
62
63pub(crate) struct AppError {
65 status: StatusCode,
66 error: anyhow::Error,
67}
68
69impl IntoResponse for AppError {
70 fn into_response(self) -> Response {
71 error!("Request error: {:?}", self.error);
72 (
73 self.status,
74 Json(json!({
75 "error": self.error.to_string(),
76 })),
77 )
78 .into_response()
79 }
80}
81
82impl AppError {
83 pub fn with_status(status: StatusCode, error: anyhow::Error) -> Self {
84 Self { status, error }
85 }
86
87 pub fn bad_request<E>(error: E) -> Self
88 where
89 E: Into<anyhow::Error>,
90 {
91 Self {
92 status: StatusCode::BAD_REQUEST,
93 error: error.into(),
94 }
95 }
96
97 pub fn internal<E>(error: E) -> Self
98 where
99 E: Into<anyhow::Error>,
100 {
101 Self {
102 status: StatusCode::INTERNAL_SERVER_ERROR,
103 error: error.into(),
104 }
105 }
106}
107
108async fn shutdown_signal() {
110 let ctrl_c = async {
111 if let Err(e) = signal::ctrl_c().await {
112 tracing::error!("Failed to install Ctrl+C handler: {}", e);
113 }
114 };
115
116 #[cfg(unix)]
117 let terminate = async {
118 match signal::unix::signal(signal::unix::SignalKind::terminate()) {
119 Ok(mut sig) => {
120 sig.recv().await;
121 }
122 Err(e) => {
123 tracing::error!("Failed to install SIGTERM handler: {}", e);
124 }
125 }
126 };
127
128 #[cfg(not(unix))]
129 let terminate = std::future::pending::<()>();
130
131 tokio::select! {
132 _ = ctrl_c => {
133 info!("Received Ctrl+C, starting graceful shutdown...");
134 },
135 _ = terminate => {
136 info!("Received SIGTERM, starting graceful shutdown...");
137 },
138 }
139}
140
141pub async fn run() -> Result<()> {
143 let config = RuntimeConfig::load().context("Failed to load configuration")?;
144 run_with_config(config).await
145}
146
147pub async fn run_with_config(config: RuntimeConfig) -> Result<()> {
149 init_tracing(&config);
151
152 info!("Server mode - full-featured HTTP server with multi-backend storage");
155
156 let addr = config
158 .server
159 .as_ref()
160 .ok_or_else(|| anyhow::anyhow!("server config required"))?
161 .listen_addr
162 .clone();
163
164 init_writer(&config)?;
166
167 let batch_config = BatcherConfig {
169 max_rows: config.batch.max_rows,
170 max_bytes: config.batch.max_bytes,
171 max_age: Duration::from_secs(config.batch.max_age_secs),
172 };
173
174 let batcher = if !config.batch.enabled {
175 info!("Batching disabled by configuration");
176 None
177 } else {
178 info!(
179 "Batching enabled (max_rows={} max_bytes={} max_age={}s)",
180 batch_config.max_rows,
181 batch_config.max_bytes,
182 batch_config.max_age.as_secs()
183 );
184 Some(Arc::new(BatchManager::new(batch_config)))
185 };
186
187 let max_payload_bytes = config.request.max_payload_bytes;
188 info!("Max payload size set to {} bytes", max_payload_bytes);
189
190 let state = AppState {
192 batcher,
193 max_payload_bytes,
194 };
195
196 let router_state = state.clone();
197
198 let app = Router::new()
201 .route("/v1/logs", post(handle_logs))
202 .route("/v1/traces", post(handle_traces))
203 .route("/v1/metrics", post(handle_metrics))
204 .route("/health", get(health_check))
205 .route("/ready", get(ready_check))
206 .layer(RequestDecompressionLayer::new().gzip(true))
207 .with_state(router_state);
208
209 let listener = tokio::net::TcpListener::bind(&addr)
211 .await
212 .context(format!("Failed to bind to {}", addr))?;
213
214 info!("OTLP HTTP endpoint listening on http://{}", addr);
215 info!("Routes:");
216 info!(" POST http://{}/v1/logs - OTLP log ingestion", addr);
217 info!(" POST http://{}/v1/metrics - OTLP metrics ingestion", addr);
218 info!(" POST http://{}/v1/traces - OTLP trace ingestion", addr);
219 info!(" GET http://{}/health - Health check", addr);
220 info!(" GET http://{}/ready - Readiness check", addr);
221 info!("Press Ctrl+C or send SIGTERM to stop");
222
223 let shutdown_flag = Arc::new(AtomicBool::new(false));
225 let flush_handle = if state.batcher.is_some() {
226 let flush_state = state.clone();
227 let flush_shutdown = Arc::clone(&shutdown_flag);
228 let flush_interval =
229 Duration::from_secs(config.batch.max_age_secs.max(1) / 2).max(Duration::from_secs(1));
230 Some(tokio::spawn(async move {
231 run_background_flush(flush_state, flush_shutdown, flush_interval).await;
232 }))
233 } else {
234 None
235 };
236
237 axum::serve(listener, app)
239 .with_graceful_shutdown(shutdown_signal())
240 .await
241 .context("Server error")?;
242
243 shutdown_flag.store(true, Ordering::SeqCst);
245 if let Some(handle) = flush_handle {
246 let _ = handle.await;
247 }
248
249 flush_pending_batches(&state).await?;
250
251 info!("Server shutdown complete");
252
253 Ok(())
254}
255
256async fn flush_pending_batches(state: &AppState) -> Result<()> {
257 if let Some(batcher) = &state.batcher {
258 let pending = batcher
259 .drain_all()
260 .context("Failed to drain pending log batches during shutdown")?;
261
262 if pending.is_empty() {
263 return Ok(());
264 }
265
266 info!(
267 batch_count = pending.len(),
268 "Flushing buffered log batches before shutdown"
269 );
270
271 for completed in pending {
272 let rows = completed.metadata.record_count;
273 let service = completed.metadata.service_name.as_ref().to_string();
274 match handlers::persist_log_batch(&completed).await {
275 Ok(paths) => {
276 for path in paths {
277 info!(
278 path = %path,
279 service_name = %service,
280 rows,
281 "Flushed pending batch"
282 );
283 }
284 }
285 Err(e) => {
286 warn!(
287 error = %e,
288 service_name = %service,
289 rows,
290 "Failed to flush pending batch during shutdown"
291 );
292 }
293 }
294 }
295 }
296
297 Ok(())
298}
299
300async fn run_background_flush(state: AppState, shutdown: Arc<AtomicBool>, interval: Duration) {
302 debug!(
303 "Background flush task started (interval={}s)",
304 interval.as_secs()
305 );
306
307 while !shutdown.load(Ordering::SeqCst) {
308 tokio::time::sleep(interval).await;
309
310 if shutdown.load(Ordering::SeqCst) {
311 break;
312 }
313
314 if let Some(batcher) = &state.batcher {
315 match batcher.drain_expired() {
316 Ok(expired) => {
317 for completed in expired {
318 let rows = completed.metadata.record_count;
319 let service = completed.metadata.service_name.as_ref().to_string();
320 match handlers::persist_log_batch(&completed).await {
321 Ok(paths) => {
322 for path in &paths {
323 info!(
324 path = %path,
325 service_name = %service,
326 rows,
327 "Flushed expired batch"
328 );
329 }
330 }
331 Err(e) => {
332 warn!(
333 error = %e,
334 service_name = %service,
335 rows,
336 "Failed to flush expired batch"
337 );
338 }
339 }
340 }
341 }
342 Err(e) => {
343 warn!(error = %e, "Failed to drain expired batches");
344 }
345 }
346 }
347 }
348
349 debug!("Background flush task stopped");
350}