Skip to main content

tael_server/
lib.rs

1//! tael-server: OTLP ingest, tiered storage, and the REST/gRPC query API.
2//!
3//! Shipped as a library so the `tael` binary can embed it behind `tael serve`
4//! (a single `cargo install`), while still being usable as a standalone crate.
5//! [`run`] is the entrypoint; [`ServerConfig`] configures it.
6
7mod api;
8mod config;
9mod ingest;
10mod log_bus;
11mod promql;
12mod span_bus;
13mod storage;
14
15use std::sync::Arc;
16
17use anyhow::Result;
18use tokio::net::TcpListener;
19use tonic::transport::Server as TonicServer;
20use tracing_subscriber::EnvFilter;
21
22pub use config::{ServerConfig, StorageBackend};
23
24use log_bus::LogBus;
25use span_bus::SpanBus;
26use storage::{BlobStore, DuckDbStore, Store, TaelBackend};
27
28/// Periodically roll spans older than the hot-tier window into the cold tier.
29/// Runs the (blocking) compaction off the async executor. The window
30/// (`retention.traces.hot_tier`, default 24h) and interval are env-tunable
31/// (`TAEL_HOT_TIER_HOURS`, `TAEL_COMPACT_INTERVAL_SECS`) until retention config
32/// lands (Phase 7); a 0-hour window compacts everything (used in tests).
33fn spawn_span_compactor(backend: Arc<TaelBackend>, blobs: Arc<BlobStore>) {
34    let window_hours: i64 = std::env::var("TAEL_HOT_TIER_HOURS")
35        .ok()
36        .and_then(|s| s.parse().ok())
37        .unwrap_or(24);
38    let interval_secs: u64 = std::env::var("TAEL_COMPACT_INTERVAL_SECS")
39        .ok()
40        .and_then(|s| s.parse().ok())
41        .unwrap_or(3600);
42    // Span metadata retention (`retention.traces.metadata`, default 365d).
43    let retention_days: i64 = std::env::var("TAEL_TRACE_RETENTION_DAYS")
44        .ok()
45        .and_then(|s| s.parse().ok())
46        .unwrap_or(365);
47    tokio::spawn(async move {
48        let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
49        loop {
50            tick.tick().await;
51            let backend = Arc::clone(&backend);
52            let blobs = Arc::clone(&blobs);
53            let result = tokio::task::spawn_blocking(move || {
54                let now = chrono::Utc::now();
55                let hot_cutoff = now - chrono::Duration::hours(window_hours);
56                let mut compacted = backend.compact_spans(hot_cutoff)?;
57                compacted += backend.compact_logs_metrics(hot_cutoff)?;
58                let dropped =
59                    backend.enforce_span_retention(now - chrono::Duration::days(retention_days))?;
60                // Payload blob GC: drop blobs no live row references (e.g. rows
61                // just removed by retention). Runs after partition drops.
62                let live = backend.collect_live_blob_hashes()?;
63                let blobs_gcd = blobs.gc(&live)?;
64                anyhow::Ok((compacted, dropped, blobs_gcd))
65            })
66            .await;
67            match result {
68                Ok(Ok((c, d, g))) if c > 0 || d > 0 || g > 0 => tracing::info!(
69                    compacted = c,
70                    partitions_dropped = d,
71                    blobs_gcd = g,
72                    "tael-backend maintenance"
73                ),
74                Ok(Ok(_)) => {}
75                Ok(Err(e)) => tracing::warn!(error = %e, "maintenance failed"),
76                Err(e) => tracing::warn!(error = %e, "maintenance task panicked"),
77            }
78        }
79    });
80}
81
82/// Start the server: OTLP gRPC + REST listeners over the configured storage
83/// backend, plus the background maintenance task when running on tael-backend.
84/// Runs until one of the listeners exits.
85pub async fn run(config: ServerConfig) -> Result<()> {
86    // Initialize tracing for the server process. `try_init` so embedding this in
87    // a binary that already set a subscriber is a no-op rather than a panic.
88    let _ = tracing_subscriber::fmt()
89        .with_env_filter(EnvFilter::from_default_env())
90        .try_init();
91
92    let blobs = Arc::new(BlobStore::new(&config.data_dir)?);
93    // The payload search index is shared between the ingest path (writes) and
94    // the tael-backend query path (reads); present only when that engine runs.
95    let mut search: Option<Arc<storage::SearchIndex>> = None;
96    let store: Arc<dyn Store> = match config.storage {
97        StorageBackend::Duckdb => Arc::new(DuckDbStore::new(&config.data_dir)?),
98        StorageBackend::TaelBackend => {
99            let backend = Arc::new(TaelBackend::new(&config.data_dir)?);
100            search = Some(backend.search_index());
101            spawn_span_compactor(Arc::clone(&backend), Arc::clone(&blobs));
102            backend as Arc<dyn Store>
103        }
104    };
105    let bus = Arc::new(SpanBus::new()?);
106    let log_bus = Arc::new(LogBus::new()?);
107
108    tracing::info!(
109        otlp_grpc = %config.otlp_grpc_addr,
110        rest_api = %config.rest_api_addr,
111        data_dir = %config.data_dir,
112        storage = ?config.storage,
113        "starting tael server"
114    );
115
116    let grpc_handle = tokio::spawn({
117        let store = Arc::clone(&store);
118        let blobs = Arc::clone(&blobs);
119        let bus = Arc::clone(&bus);
120        let log_bus = Arc::clone(&log_bus);
121        let addr = config.otlp_grpc_addr.parse()?;
122        async move {
123            let trace_service = ingest::otlp::OtlpTraceService::new(
124                Arc::clone(&store),
125                Arc::clone(&blobs),
126                search.clone(),
127                bus,
128            );
129            let logs_service = ingest::otlp_logs::OtlpLogsService::new(
130                Arc::clone(&store),
131                Arc::clone(&blobs),
132                log_bus,
133            );
134            let metrics_service =
135                ingest::otlp_metrics::OtlpMetricsService::new(store);
136            TonicServer::builder()
137                .add_service(
138                    opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer::new(trace_service),
139                )
140                .add_service(
141                    opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer::new(logs_service),
142                )
143                .add_service(
144                    opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(metrics_service),
145                )
146                .serve(addr)
147                .await
148                .expect("gRPC server failed");
149        }
150    });
151
152    let rest_handle = tokio::spawn({
153        let store = Arc::clone(&store);
154        let blobs = Arc::clone(&blobs);
155        let bus = Arc::clone(&bus);
156        let log_bus = Arc::clone(&log_bus);
157        let addr = config.rest_api_addr.clone();
158        async move {
159            let app = api::rest::router(store, blobs, bus, log_bus);
160            let listener = TcpListener::bind(&addr).await.expect("failed to bind REST addr");
161            tracing::info!(%addr, "REST API listening");
162            axum::serve(listener, app).await.expect("REST server failed");
163        }
164    });
165
166    tokio::select! {
167        r = grpc_handle => r?,
168        r = rest_handle => r?,
169    }
170
171    Ok(())
172}