Skip to main content

tael_server/
lib.rs

1//! tael-server: OTLP ingest, tiered storage, and the REST/gRPC query API.
2//!
3//! Shipped as a library so the `tael` binary can embed it behind `tael serve`
4//! (a single `cargo install`), while still being usable as a standalone crate.
5//! [`run`] is the entrypoint; [`ServerConfig`] configures it.
6
7mod api;
8mod cluster;
9mod config;
10mod ingest;
11mod log_bus;
12mod promql;
13mod span_bus;
14mod storage;
15
16use std::sync::Arc;
17
18use anyhow::{Context, Result};
19use tokio::net::TcpListener;
20use tonic::transport::Server as TonicServer;
21use tracing_subscriber::EnvFilter;
22
23pub use config::{ServerConfig, StorageBackend};
24
25use log_bus::LogBus;
26use span_bus::SpanBus;
27use storage::{
28    BlobStore, DuckDbStore, FanoutStore, RemoteStore, RemoteWalSink, Store, TaelBackend, WalSink,
29};
30
31/// Periodically roll spans older than the hot-tier window into the cold tier.
32/// Runs the (blocking) compaction off the async executor. The window
33/// (`retention.traces.hot_tier`, default 24h) and interval are env-tunable
34/// (`TAEL_HOT_TIER_HOURS`, `TAEL_COMPACT_INTERVAL_SECS`) until retention config
35/// lands (Phase 7); a 0-hour window compacts everything (used in tests).
36fn spawn_span_compactor(backend: Arc<TaelBackend>, blobs: Arc<BlobStore>) {
37    let window_hours: i64 = std::env::var("TAEL_HOT_TIER_HOURS")
38        .ok()
39        .and_then(|s| s.parse().ok())
40        .unwrap_or(24);
41    let interval_secs: u64 = std::env::var("TAEL_COMPACT_INTERVAL_SECS")
42        .ok()
43        .and_then(|s| s.parse().ok())
44        .unwrap_or(3600);
45    // Span metadata retention (`retention.traces.metadata`, default 365d).
46    let retention_days: i64 = std::env::var("TAEL_TRACE_RETENTION_DAYS")
47        .ok()
48        .and_then(|s| s.parse().ok())
49        .unwrap_or(365);
50    tokio::spawn(async move {
51        let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
52        loop {
53            tick.tick().await;
54            let backend = Arc::clone(&backend);
55            let blobs = Arc::clone(&blobs);
56            let result = tokio::task::spawn_blocking(move || {
57                let now = chrono::Utc::now();
58                let hot_cutoff = now - chrono::Duration::hours(window_hours);
59                let mut compacted = backend.compact_spans(hot_cutoff)?;
60                compacted += backend.compact_logs_metrics(hot_cutoff)?;
61                let dropped =
62                    backend.enforce_span_retention(now - chrono::Duration::days(retention_days))?;
63                // Payload blob GC: drop blobs no live row references (e.g. rows
64                // just removed by retention). Runs after partition drops.
65                let live = backend.collect_live_blob_hashes()?;
66                let blobs_gcd = blobs.gc(&live)?;
67                anyhow::Ok((compacted, dropped, blobs_gcd))
68            })
69            .await;
70            match result {
71                Ok(Ok((c, d, g))) if c > 0 || d > 0 || g > 0 => tracing::info!(
72                    compacted = c,
73                    partitions_dropped = d,
74                    blobs_gcd = g,
75                    "tael-backend maintenance"
76                ),
77                Ok(Ok(_)) => {}
78                Ok(Err(e)) => tracing::warn!(error = %e, "maintenance failed"),
79                Err(e) => tracing::warn!(error = %e, "maintenance task panicked"),
80            }
81        }
82    });
83}
84
85/// Start the server: OTLP gRPC + REST listeners over the configured storage
86/// backend, plus the background maintenance task when running on tael-backend.
87/// Runs until one of the listeners exits.
88pub async fn run(config: ServerConfig) -> Result<()> {
89    // Initialize tracing for the server process. `try_init` so embedding this in
90    // a binary that already set a subscriber is a no-op rather than a panic.
91    let _ = tracing_subscriber::fmt()
92        .with_env_filter(EnvFilter::from_default_env())
93        .try_init();
94
95    let blobs = Arc::new(BlobStore::new(&config.data_dir)?);
96
97    // Cluster coordination (chitchat): automatic leader election + epoch fencing
98    // of WAL replication (§5.1). On when TAEL_CLUSTER_LISTEN is set.
99    let coordinator = match &config.cluster {
100        Some(cs) => {
101            let coord = cluster::ClusterCoordinator::start(cluster::ClusterConfig {
102                node_id: cs.node_id.clone(),
103                listen_addr: cs.listen_addr.parse().context("parsing TAEL_CLUSTER_LISTEN")?,
104                advertise_addr: cs
105                    .advertise_addr
106                    .parse()
107                    .context("parsing TAEL_CLUSTER_ADVERTISE")?,
108                seeds: cs.seeds.clone(),
109                cluster_id: cs.cluster_id.clone(),
110            })
111            .await?;
112            Some(coord)
113        }
114        None => None,
115    };
116
117    // The payload search index is shared between the ingest path (writes) and
118    // the tael-backend query path (reads); present only when that engine runs.
119    let mut search: Option<Arc<storage::SearchIndex>> = None;
120    let store: Arc<dyn Store> = if !config.query_shards.is_empty() {
121        // Stateless query-tier mode: serve reads by scatter-gather over remote
122        // shards, no local engine (`docs/tael-server-scaling-ha.md` §3, Phase 2).
123        let shards = config
124            .query_shards
125            .iter()
126            .map(|url| RemoteStore::new(url).map(|s| Arc::new(s) as Arc<dyn Store>))
127            .collect::<Result<Vec<_>>>()?;
128        tracing::info!(
129            shards = shards.len(),
130            "query fan-out mode: reads scatter-gather across remote shards (no local engine)"
131        );
132        Arc::new(FanoutStore::new(shards)?)
133    } else {
134        match config.storage {
135            StorageBackend::Duckdb => Arc::new(DuckDbStore::new(&config.data_dir)?),
136            StorageBackend::TaelBackend => {
137                // WAL replication: when standbys are configured, this node is a
138                // leader that ships every appended record to them before acking
139                // (§5.1). With no standbys the write path is unchanged.
140                let sinks: Vec<Arc<dyn WalSink>> = config
141                    .wal_standbys
142                    .iter()
143                    .map(|url| {
144                        // Stamp the leader epoch (for standby fencing) when a
145                        // coordinator is running; otherwise ship unfenced.
146                        let sink = match &coordinator {
147                            Some(c) => RemoteWalSink::with_epoch(url, c.leader_epoch_handle()),
148                            None => RemoteWalSink::new(url),
149                        };
150                        sink.map(|s| Arc::new(s) as Arc<dyn WalSink>)
151                    })
152                    .collect::<Result<Vec<_>>>()?;
153                let backend = Arc::new(if sinks.is_empty() {
154                    TaelBackend::new(&config.data_dir)?
155                } else {
156                    tracing::info!(
157                        standbys = sinks.len(),
158                        required_acks = ?config.wal_required_acks,
159                        "WAL replication enabled: shipping to standbys (leader)"
160                    );
161                    TaelBackend::with_wal_key_and_sinks(
162                        &config.data_dir,
163                        "tael-backend",
164                        sinks,
165                        config.wal_required_acks,
166                    )?
167                });
168                search = Some(backend.search_index());
169                spawn_span_compactor(Arc::clone(&backend), Arc::clone(&blobs));
170                backend as Arc<dyn Store>
171            }
172        }
173    };
174    let bus = Arc::new(SpanBus::new()?);
175    let log_bus = Arc::new(LogBus::new()?);
176
177    tracing::info!(
178        otlp_grpc = %config.otlp_grpc_addr,
179        rest_api = %config.rest_api_addr,
180        data_dir = %config.data_dir,
181        storage = ?config.storage,
182        "starting tael server"
183    );
184
185    let grpc_handle = tokio::spawn({
186        let store = Arc::clone(&store);
187        let blobs = Arc::clone(&blobs);
188        let bus = Arc::clone(&bus);
189        let log_bus = Arc::clone(&log_bus);
190        let addr = config.otlp_grpc_addr.parse()?;
191        async move {
192            let trace_service = ingest::otlp::OtlpTraceService::new(
193                Arc::clone(&store),
194                Arc::clone(&blobs),
195                search.clone(),
196                bus,
197            );
198            let logs_service = ingest::otlp_logs::OtlpLogsService::new(
199                Arc::clone(&store),
200                Arc::clone(&blobs),
201                log_bus,
202            );
203            let metrics_service =
204                ingest::otlp_metrics::OtlpMetricsService::new(store);
205            TonicServer::builder()
206                .add_service(
207                    opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer::new(trace_service),
208                )
209                .add_service(
210                    opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer::new(logs_service),
211                )
212                .add_service(
213                    opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(metrics_service),
214                )
215                .serve_with_shutdown(addr, shutdown_signal())
216                .await
217                .expect("gRPC server failed");
218        }
219    });
220
221    let rest_handle = tokio::spawn({
222        let store = Arc::clone(&store);
223        let blobs = Arc::clone(&blobs);
224        let bus = Arc::clone(&bus);
225        let log_bus = Arc::clone(&log_bus);
226        let cluster = coordinator.clone();
227        let addr = config.rest_api_addr.clone();
228        async move {
229            let app = api::rest::router(store, blobs, bus, log_bus, cluster);
230            let listener = TcpListener::bind(&addr).await.expect("failed to bind REST addr");
231            tracing::info!(%addr, "REST API listening");
232            axum::serve(listener, app)
233                .with_graceful_shutdown(shutdown_signal())
234                .await
235                .expect("REST server failed");
236        }
237    });
238
239    // Both listeners drain on SIGTERM/Ctrl-C; await both so in-flight requests
240    // finish before we flush and exit (`docs/tael-server-scaling-ha.md` §5.4).
241    let (grpc_res, rest_res) = tokio::join!(grpc_handle, rest_handle);
242    grpc_res?;
243    rest_res?;
244
245    // Best-effort flush so a restart/standby replays less WAL. Durability is
246    // already guaranteed by the per-write WAL fsync.
247    if let Err(e) = store.flush() {
248        tracing::warn!(error = %e, "flush on shutdown failed");
249    }
250    tracing::info!("tael server stopped");
251
252    Ok(())
253}
254
255/// Resolve when the process is asked to stop: Ctrl-C, or SIGTERM on Unix
256/// (the orchestrator's graceful-stop signal). Both listeners await their own
257/// copy; the OS delivers the signal to every registered handler.
258async fn shutdown_signal() {
259    let ctrl_c = async {
260        let _ = tokio::signal::ctrl_c().await;
261    };
262
263    #[cfg(unix)]
264    let terminate = async {
265        match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
266            Ok(mut s) => {
267                s.recv().await;
268            }
269            Err(e) => {
270                tracing::warn!(error = %e, "failed to install SIGTERM handler");
271                std::future::pending::<()>().await;
272            }
273        }
274    };
275
276    #[cfg(not(unix))]
277    let terminate = std::future::pending::<()>();
278
279    tokio::select! {
280        _ = ctrl_c => {}
281        _ = terminate => {}
282    }
283    tracing::info!("shutdown signal received; draining listeners");
284}