Skip to main content

tael_server/
lib.rs

1//! tael-server: OTLP ingest, tiered storage, and the REST/gRPC query API.
2//!
3//! Shipped as a library so the `tael` binary can embed it behind `tael serve`
4//! (a single `cargo install`), while still being usable as a standalone crate.
5//! [`run`] is the entrypoint; [`ServerConfig`] configures it.
6
7mod api;
8mod cluster;
9mod config;
10mod ingest;
11mod log_bus;
12mod promql;
13mod span_bus;
14mod storage;
15
16use std::sync::Arc;
17
18use anyhow::{Context, Result};
19use tokio::net::TcpListener;
20use tonic::transport::Server as TonicServer;
21use tracing_subscriber::EnvFilter;
22
23pub use config::{ServerConfig, StorageBackend};
24pub use storage::models::{
25    LogRecord, LogSeverity, MetricPoint, MetricType, Span, SpanEvent, SpanKind, SpanStatus,
26    TraceQuery,
27};
28pub use storage::{
29    BlobStore, DuckDbStore, FanoutStore, RemoteStore, RemoteWalSink, Store, TaelBackend, WalSink,
30};
31
32use log_bus::LogBus;
33use span_bus::SpanBus;
34
35/// Periodically roll spans older than the hot-tier window into the cold tier.
36/// Runs the (blocking) compaction off the async executor. The window
37/// (`retention.traces.hot_tier`, default 24h) and interval are env-tunable
38/// (`TAEL_HOT_TIER_HOURS`, `TAEL_COMPACT_INTERVAL_SECS`) until retention config
39/// lands (Phase 7); a 0-hour window compacts everything (used in tests).
40fn spawn_span_compactor(backend: Arc<TaelBackend>, blobs: Arc<BlobStore>) {
41    let window_hours: i64 = std::env::var("TAEL_HOT_TIER_HOURS")
42        .ok()
43        .and_then(|s| s.parse().ok())
44        .unwrap_or(24);
45    let interval_secs: u64 = std::env::var("TAEL_COMPACT_INTERVAL_SECS")
46        .ok()
47        .and_then(|s| s.parse().ok())
48        .unwrap_or(3600);
49    // Span metadata retention (`retention.traces.metadata`, default 365d).
50    let retention_days: i64 = std::env::var("TAEL_TRACE_RETENTION_DAYS")
51        .ok()
52        .and_then(|s| s.parse().ok())
53        .unwrap_or(365);
54    tokio::spawn(async move {
55        let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
56        loop {
57            tick.tick().await;
58            let backend = Arc::clone(&backend);
59            let blobs = Arc::clone(&blobs);
60            let result = tokio::task::spawn_blocking(move || {
61                let now = chrono::Utc::now();
62                let hot_cutoff = now - chrono::Duration::hours(window_hours);
63                let mut compacted = backend.compact_spans(hot_cutoff)?;
64                compacted += backend.compact_logs_metrics(hot_cutoff)?;
65                let dropped =
66                    backend.enforce_span_retention(now - chrono::Duration::days(retention_days))?;
67                // Payload blob GC: drop blobs no live row references (e.g. rows
68                // just removed by retention). Runs after partition drops.
69                let live = backend.collect_live_blob_hashes()?;
70                let blobs_gcd = blobs.gc(&live)?;
71                anyhow::Ok((compacted, dropped, blobs_gcd))
72            })
73            .await;
74            match result {
75                Ok(Ok((c, d, g))) if c > 0 || d > 0 || g > 0 => tracing::info!(
76                    compacted = c,
77                    partitions_dropped = d,
78                    blobs_gcd = g,
79                    "tael-backend maintenance"
80                ),
81                Ok(Ok(_)) => {}
82                Ok(Err(e)) => tracing::warn!(error = %e, "maintenance failed"),
83                Err(e) => tracing::warn!(error = %e, "maintenance task panicked"),
84            }
85        }
86    });
87}
88
89/// Start the server: OTLP gRPC + REST listeners over the configured storage
90/// backend, plus the background maintenance task when running on tael-backend.
91/// Runs until one of the listeners exits.
92pub async fn run(config: ServerConfig) -> Result<()> {
93    // Initialize tracing for the server process. `try_init` so embedding this in
94    // a binary that already set a subscriber is a no-op rather than a panic.
95    let _ = tracing_subscriber::fmt()
96        .with_env_filter(EnvFilter::from_default_env())
97        .try_init();
98
99    configure_walrus_data_dir(&config.wal_dir);
100
101    let blobs = Arc::new(BlobStore::new(&config.data_dir)?);
102
103    // Cluster coordination (chitchat): automatic leader election + epoch fencing
104    // of WAL replication (§5.1). On when TAEL_CLUSTER_LISTEN is set.
105    let coordinator = match &config.cluster {
106        Some(cs) => {
107            let coord = cluster::ClusterCoordinator::start(cluster::ClusterConfig {
108                node_id: cs.node_id.clone(),
109                listen_addr: cs
110                    .listen_addr
111                    .parse()
112                    .context("parsing TAEL_CLUSTER_LISTEN")?,
113                advertise_addr: cs
114                    .advertise_addr
115                    .parse()
116                    .context("parsing TAEL_CLUSTER_ADVERTISE")?,
117                seeds: cs.seeds.clone(),
118                cluster_id: cs.cluster_id.clone(),
119            })
120            .await?;
121            Some(coord)
122        }
123        None => None,
124    };
125
126    // The payload search index is shared between the ingest path (writes) and
127    // the tael-backend query path (reads); present only when that engine runs.
128    let mut search: Option<Arc<storage::SearchIndex>> = None;
129    let store: Arc<dyn Store> = if !config.query_shards.is_empty() {
130        // Stateless query-tier mode: serve reads by scatter-gather over remote
131        // shards, no local engine (`docs/tael-server-scaling-ha.md` §3, Phase 2).
132        let shards = config
133            .query_shards
134            .iter()
135            .map(|url| RemoteStore::new(url).map(|s| Arc::new(s) as Arc<dyn Store>))
136            .collect::<Result<Vec<_>>>()?;
137        tracing::info!(
138            shards = shards.len(),
139            "query fan-out mode: reads scatter-gather across remote shards (no local engine)"
140        );
141        Arc::new(FanoutStore::new(shards)?)
142    } else {
143        match config.storage {
144            StorageBackend::Duckdb => Arc::new(DuckDbStore::new(&config.data_dir)?),
145            StorageBackend::TaelBackend => {
146                // WAL replication: when standbys are configured, this node is a
147                // leader that ships every appended record to them before acking
148                // (§5.1). With no standbys the write path is unchanged.
149                let sinks: Vec<Arc<dyn WalSink>> = config
150                    .wal_standbys
151                    .iter()
152                    .map(|url| {
153                        // Stamp the leader epoch (for standby fencing) when a
154                        // coordinator is running; otherwise ship unfenced.
155                        let sink = match &coordinator {
156                            Some(c) => RemoteWalSink::with_epoch(url, c.leader_epoch_handle()),
157                            None => RemoteWalSink::new(url),
158                        };
159                        sink.map(|s| Arc::new(s) as Arc<dyn WalSink>)
160                    })
161                    .collect::<Result<Vec<_>>>()?;
162                let backend = Arc::new(if sinks.is_empty() {
163                    TaelBackend::new(&config.data_dir)?
164                } else {
165                    tracing::info!(
166                        standbys = sinks.len(),
167                        required_acks = ?config.wal_required_acks,
168                        "WAL replication enabled: shipping to standbys (leader)"
169                    );
170                    TaelBackend::with_wal_key_and_sinks(
171                        &config.data_dir,
172                        "tael-backend",
173                        sinks,
174                        config.wal_required_acks,
175                    )?
176                });
177                search = Some(backend.search_index());
178                spawn_span_compactor(Arc::clone(&backend), Arc::clone(&blobs));
179                backend as Arc<dyn Store>
180            }
181        }
182    };
183    let bus = Arc::new(SpanBus::new()?);
184    let log_bus = Arc::new(LogBus::new()?);
185
186    tracing::info!(
187        otlp_grpc = %config.otlp_grpc_addr,
188        rest_api = %config.rest_api_addr,
189        data_dir = %config.data_dir,
190        wal_dir = %config.wal_dir,
191        storage = ?config.storage,
192        "starting tael server"
193    );
194
195    let grpc_handle = tokio::spawn({
196        let store = Arc::clone(&store);
197        let blobs = Arc::clone(&blobs);
198        let bus = Arc::clone(&bus);
199        let log_bus = Arc::clone(&log_bus);
200        let addr = config.otlp_grpc_addr.parse()?;
201        async move {
202            let trace_service = ingest::otlp::OtlpTraceService::new(
203                Arc::clone(&store),
204                Arc::clone(&blobs),
205                search.clone(),
206                bus,
207            );
208            let logs_service = ingest::otlp_logs::OtlpLogsService::new(
209                Arc::clone(&store),
210                Arc::clone(&blobs),
211                log_bus,
212            );
213            let metrics_service = ingest::otlp_metrics::OtlpMetricsService::new(store);
214            TonicServer::builder()
215                .add_service(
216                    opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer::new(trace_service),
217                )
218                .add_service(
219                    opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer::new(logs_service),
220                )
221                .add_service(
222                    opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(metrics_service),
223                )
224                .serve_with_shutdown(addr, shutdown_signal())
225                .await
226                .expect("gRPC server failed");
227        }
228    });
229
230    let rest_handle = tokio::spawn({
231        let store = Arc::clone(&store);
232        let blobs = Arc::clone(&blobs);
233        let bus = Arc::clone(&bus);
234        let log_bus = Arc::clone(&log_bus);
235        let cluster = coordinator.clone();
236        let addr = config.rest_api_addr.clone();
237        async move {
238            let app = api::rest::router(store, blobs, bus, log_bus, cluster);
239            let listener = TcpListener::bind(&addr)
240                .await
241                .expect("failed to bind REST addr");
242            tracing::info!(%addr, "REST API listening");
243            axum::serve(listener, app)
244                .with_graceful_shutdown(shutdown_signal())
245                .await
246                .expect("REST server failed");
247        }
248    });
249
250    print_startup_banner(&config);
251
252    // Both listeners drain on SIGTERM/Ctrl-C; await both so in-flight requests
253    // finish before we flush and exit (`docs/tael-server-scaling-ha.md` §5.4).
254    let (grpc_res, rest_res) = tokio::join!(grpc_handle, rest_handle);
255    grpc_res?;
256    rest_res?;
257
258    // Best-effort flush so a restart/standby replays less WAL. Durability is
259    // already guaranteed by the per-write WAL fsync.
260    if let Err(e) = store.flush() {
261        tracing::warn!(error = %e, "flush on shutdown failed");
262    }
263    tracing::info!("tael server stopped");
264
265    Ok(())
266}
267
268fn configure_walrus_data_dir(wal_dir: &str) {
269    // walrus-rust currently exposes its storage root through process env only.
270    // Tael owns the server process and sets this once before opening the WAL.
271    unsafe {
272        std::env::set_var("WALRUS_DATA_DIR", wal_dir);
273    }
274}
275
276/// Friendly stdout banner shown on startup so a user running `tael serve`
277/// (with or without `--port`) immediately sees where to connect a CLI and
278/// where to point an OTLP exporter. Goes through `println!` so it's visible
279/// regardless of `RUST_LOG`.
280fn print_startup_banner(config: &ServerConfig) {
281    let rest = &config.rest_api_addr;
282    let otlp = &config.otlp_grpc_addr;
283    let connect_flag = cli_connect_flag(rest);
284
285    println!("tael server starting");
286    println!("  REST API     http://{rest}");
287    println!("  OTLP gRPC    {otlp}");
288    println!("  data dir     {}", config.data_dir);
289    println!("  WAL dir      {}", config.wal_dir);
290    println!("  storage      {:?}", config.storage);
291    println!();
292    println!("Connect a CLI from this machine:");
293    println!("  tael{connect_flag} services");
294    println!("  tael{connect_flag} live");
295    println!();
296    println!("Point a service at this server (OTLP):");
297    println!("  export OTEL_EXPORTER_OTLP_ENDPOINT=http://{otlp}");
298    println!("  export OTEL_EXPORTER_OTLP_PROTOCOL=grpc");
299    println!("  export OTEL_SERVICE_NAME=<your-service>");
300    println!();
301}
302
303/// Pick the CLI flag (if any) needed to reach this REST listener. Empty when
304/// REST is on the CLI default `127.0.0.1:7701`; `--port-rest N` when only the
305/// port differs; full `--server …` otherwise.
306fn cli_connect_flag(rest_addr: &str) -> String {
307    let (host, port) = match rest_addr.rsplit_once(':') {
308        Some((h, p)) => (h, p),
309        None => return String::new(),
310    };
311    let local = matches!(
312        host,
313        "127.0.0.1" | "localhost" | "0.0.0.0" | "::1" | "[::1]"
314    );
315    match (local, port) {
316        (true, "7701") => String::new(),
317        (true, p) => format!(" --port-rest {p}"),
318        (false, _) => format!(" --server http://{rest_addr}"),
319    }
320}
321
322/// Resolve when the process is asked to stop: Ctrl-C, or SIGTERM on Unix
323/// (the orchestrator's graceful-stop signal). Both listeners await their own
324/// copy; the OS delivers the signal to every registered handler.
325async fn shutdown_signal() {
326    let ctrl_c = async {
327        let _ = tokio::signal::ctrl_c().await;
328    };
329
330    #[cfg(unix)]
331    let terminate = async {
332        match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
333            Ok(mut s) => {
334                s.recv().await;
335            }
336            Err(e) => {
337                tracing::warn!(error = %e, "failed to install SIGTERM handler");
338                std::future::pending::<()>().await;
339            }
340        }
341    };
342
343    #[cfg(not(unix))]
344    let terminate = std::future::pending::<()>();
345
346    tokio::select! {
347        _ = ctrl_c => {}
348        _ = terminate => {}
349    }
350    tracing::info!("shutdown signal received; draining listeners");
351}