otlp2parquet/
lib.rs

1// Server mode - Full-featured HTTP server with multi-backend storage
2//
3// This is the default/general-purpose mode that can run anywhere:
4// - Docker containers
5// - Kubernetes
6// - Local development
7// - VM instances
8//
9// Features:
10// - Axum HTTP server (HTTP/1.1, HTTP/2)
11// - Multi-backend storage (S3-compatible, filesystem)
12// - Structured logging with tracing
13// - Graceful shutdown
14// - Production-ready
15
16use anyhow::{Context, Result};
17use axum::{
18    http::StatusCode,
19    response::{IntoResponse, Response},
20    routing::{get, post},
21    Json, Router,
22};
23
24pub mod config;
25pub mod types;
26
27pub use config::{
28    BatchConfig, EnvSource, FsConfig, LogFormat, Platform, RequestConfig, RuntimeConfig,
29    ServerConfig, StorageBackend, StorageConfig, ENV_PREFIX,
30};
31pub use otlp2records::InputFormat;
32pub use types::{Blake3Hash, MetricType, SignalKey, SignalType};
33
34mod batch;
35pub mod codec;
36
37use batch::{BatchConfig as BatcherConfig, BatchManager};
38use serde_json::json;
39use std::sync::atomic::{AtomicBool, Ordering};
40use std::sync::Arc;
41use std::time::Duration;
42use tokio::signal;
43use tower_http::decompression::RequestDecompressionLayer;
44use tracing::{debug, error, info, warn};
45
46mod handlers;
47mod init;
48mod writer;
49
50pub mod connect;
51
52use handlers::{handle_logs, handle_metrics, handle_traces, health_check, ready_check};
53pub use init::init_tracing;
54use init::init_writer;
55
56/// Application state shared across all requests
57#[derive(Clone)]
58pub(crate) struct AppState {
59    pub batcher: Option<Arc<BatchManager>>,
60    pub max_payload_bytes: usize,
61}
62
63/// Error type that implements IntoResponse
64pub(crate) struct AppError {
65    status: StatusCode,
66    error: anyhow::Error,
67}
68
69impl IntoResponse for AppError {
70    fn into_response(self) -> Response {
71        error!("Request error: {:?}", self.error);
72        (
73            self.status,
74            Json(json!({
75                "error": self.error.to_string(),
76            })),
77        )
78            .into_response()
79    }
80}
81
82impl AppError {
83    pub fn with_status(status: StatusCode, error: anyhow::Error) -> Self {
84        Self { status, error }
85    }
86
87    pub fn bad_request<E>(error: E) -> Self
88    where
89        E: Into<anyhow::Error>,
90    {
91        Self {
92            status: StatusCode::BAD_REQUEST,
93            error: error.into(),
94        }
95    }
96
97    pub fn internal<E>(error: E) -> Self
98    where
99        E: Into<anyhow::Error>,
100    {
101        Self {
102            status: StatusCode::INTERNAL_SERVER_ERROR,
103            error: error.into(),
104        }
105    }
106}
107
108/// Graceful shutdown handler
109async fn shutdown_signal() {
110    let ctrl_c = async {
111        if let Err(e) = signal::ctrl_c().await {
112            tracing::error!("Failed to install Ctrl+C handler: {}", e);
113        }
114    };
115
116    #[cfg(unix)]
117    let terminate = async {
118        match signal::unix::signal(signal::unix::SignalKind::terminate()) {
119            Ok(mut sig) => {
120                sig.recv().await;
121            }
122            Err(e) => {
123                tracing::error!("Failed to install SIGTERM handler: {}", e);
124            }
125        }
126    };
127
128    #[cfg(not(unix))]
129    let terminate = std::future::pending::<()>();
130
131    tokio::select! {
132        _ = ctrl_c => {
133            info!("Received Ctrl+C, starting graceful shutdown...");
134        },
135        _ = terminate => {
136            info!("Received SIGTERM, starting graceful shutdown...");
137        },
138    }
139}
140
141/// Entry point for server mode (loads config automatically)
142pub async fn run() -> Result<()> {
143    let config = RuntimeConfig::load().context("Failed to load configuration")?;
144    run_with_config(config).await
145}
146
147/// Entry point for server mode with pre-loaded configuration (for CLI usage)
148pub async fn run_with_config(config: RuntimeConfig) -> Result<()> {
149    // Initialize tracing with config
150    init_tracing(&config);
151
152    // Configure Parquet writer properties before first use
153
154    info!("Server mode - full-featured HTTP server with multi-backend storage");
155
156    // Get listen address from config
157    let addr = config
158        .server
159        .as_ref()
160        .ok_or_else(|| anyhow::anyhow!("server config required"))?
161        .listen_addr
162        .clone();
163
164    // Initialize storage
165    init_writer(&config)?;
166
167    // Configure batching
168    let batch_config = BatcherConfig {
169        max_rows: config.batch.max_rows,
170        max_bytes: config.batch.max_bytes,
171        max_age: Duration::from_secs(config.batch.max_age_secs),
172    };
173
174    let batcher = if !config.batch.enabled {
175        info!("Batching disabled by configuration");
176        None
177    } else {
178        info!(
179            "Batching enabled (max_rows={} max_bytes={} max_age={}s)",
180            batch_config.max_rows,
181            batch_config.max_bytes,
182            batch_config.max_age.as_secs()
183        );
184        Some(Arc::new(BatchManager::new(batch_config)))
185    };
186
187    let max_payload_bytes = config.request.max_payload_bytes;
188    info!("Max payload size set to {} bytes", max_payload_bytes);
189
190    // Create app state
191    let state = AppState {
192        batcher,
193        max_payload_bytes,
194    };
195
196    let router_state = state.clone();
197
198    // Build router with gzip decompression support
199    // OTel collectors typically send gzip-compressed payloads by default
200    let app = Router::new()
201        .route("/v1/logs", post(handle_logs))
202        .route("/v1/traces", post(handle_traces))
203        .route("/v1/metrics", post(handle_metrics))
204        .route("/health", get(health_check))
205        .route("/ready", get(ready_check))
206        .layer(RequestDecompressionLayer::new().gzip(true))
207        .with_state(router_state);
208
209    // Create TCP listener
210    let listener = tokio::net::TcpListener::bind(&addr)
211        .await
212        .context(format!("Failed to bind to {}", addr))?;
213
214    info!("OTLP HTTP endpoint listening on http://{}", addr);
215    info!("Routes:");
216    info!("  POST http://{}/v1/logs    - OTLP log ingestion", addr);
217    info!("  POST http://{}/v1/metrics - OTLP metrics ingestion", addr);
218    info!("  POST http://{}/v1/traces  - OTLP trace ingestion", addr);
219    info!("  GET  http://{}/health     - Health check", addr);
220    info!("  GET  http://{}/ready      - Readiness check", addr);
221    info!("Press Ctrl+C or send SIGTERM to stop");
222
223    // Spawn background flush task if batching is enabled
224    let shutdown_flag = Arc::new(AtomicBool::new(false));
225    let flush_handle = if state.batcher.is_some() {
226        let flush_state = state.clone();
227        let flush_shutdown = Arc::clone(&shutdown_flag);
228        let flush_interval =
229            Duration::from_secs(config.batch.max_age_secs.max(1) / 2).max(Duration::from_secs(1));
230        Some(tokio::spawn(async move {
231            run_background_flush(flush_state, flush_shutdown, flush_interval).await;
232        }))
233    } else {
234        None
235    };
236
237    // Start server with graceful shutdown
238    axum::serve(listener, app)
239        .with_graceful_shutdown(shutdown_signal())
240        .await
241        .context("Server error")?;
242
243    // Signal background task to stop and wait for it
244    shutdown_flag.store(true, Ordering::SeqCst);
245    if let Some(handle) = flush_handle {
246        let _ = handle.await;
247    }
248
249    flush_pending_batches(&state).await?;
250
251    info!("Server shutdown complete");
252
253    Ok(())
254}
255
256async fn flush_pending_batches(state: &AppState) -> Result<()> {
257    if let Some(batcher) = &state.batcher {
258        let pending = batcher
259            .drain_all()
260            .context("Failed to drain pending log batches during shutdown")?;
261
262        if pending.is_empty() {
263            return Ok(());
264        }
265
266        info!(
267            batch_count = pending.len(),
268            "Flushing buffered log batches before shutdown"
269        );
270
271        for completed in pending {
272            let rows = completed.metadata.record_count;
273            let service = completed.metadata.service_name.as_ref().to_string();
274            match handlers::persist_log_batch(&completed).await {
275                Ok(paths) => {
276                    for path in paths {
277                        info!(
278                            path = %path,
279                            service_name = %service,
280                            rows,
281                            "Flushed pending batch"
282                        );
283                    }
284                }
285                Err(e) => {
286                    warn!(
287                        error = %e,
288                        service_name = %service,
289                        rows,
290                        "Failed to flush pending batch during shutdown"
291                    );
292                }
293            }
294        }
295    }
296
297    Ok(())
298}
299
300/// Background task that periodically flushes expired batches
301async fn run_background_flush(state: AppState, shutdown: Arc<AtomicBool>, interval: Duration) {
302    debug!(
303        "Background flush task started (interval={}s)",
304        interval.as_secs()
305    );
306
307    while !shutdown.load(Ordering::SeqCst) {
308        tokio::time::sleep(interval).await;
309
310        if shutdown.load(Ordering::SeqCst) {
311            break;
312        }
313
314        if let Some(batcher) = &state.batcher {
315            match batcher.drain_expired() {
316                Ok(expired) => {
317                    for completed in expired {
318                        let rows = completed.metadata.record_count;
319                        let service = completed.metadata.service_name.as_ref().to_string();
320                        match handlers::persist_log_batch(&completed).await {
321                            Ok(paths) => {
322                                for path in &paths {
323                                    info!(
324                                        path = %path,
325                                        service_name = %service,
326                                        rows,
327                                        "Flushed expired batch"
328                                    );
329                                }
330                            }
331                            Err(e) => {
332                                warn!(
333                                    error = %e,
334                                    service_name = %service,
335                                    rows,
336                                    "Failed to flush expired batch"
337                                );
338                            }
339                        }
340                    }
341                }
342                Err(e) => {
343                    warn!(error = %e, "Failed to drain expired batches");
344                }
345            }
346        }
347    }
348
349    debug!("Background flush task stopped");
350}