Skip to main content

tael_server/
lib.rs

1//! tael-server: OTLP ingest, tiered storage, and the REST/gRPC query API.
2//!
3//! Shipped as a library so the `tael` binary can embed it behind `tael serve`
4//! (a single `cargo install`), while still being usable as a standalone crate.
5//! [`run`] is the default CLI-style entrypoint; [`run_embedded`] starts the same
6//! server in quiet mode for in-process integrations. [`ServerConfig`] configures
7//! the listeners and storage.
8
9mod api;
10mod cluster;
11mod config;
12mod ingest;
13mod log_bus;
14mod promql;
15mod span_bus;
16mod storage;
17
18use std::sync::Arc;
19
20use anyhow::{Context, Result, bail};
21use tokio::net::TcpListener;
22use tonic::transport::Server as TonicServer;
23use tracing_subscriber::EnvFilter;
24
25pub use config::{ServerConfig, StorageBackend};
26pub use storage::models::{
27    LogRecord, LogSeverity, MetricPoint, MetricType, Span, SpanEvent, SpanKind, SpanStatus,
28    TraceQuery,
29};
30pub use storage::{
31    BlobStore, DuckDbStore, FanoutStore, RemoteStore, RemoteWalSink, Store, TaelBackend, WalSink,
32};
33
34use log_bus::LogBus;
35use span_bus::SpanBus;
36
37/// Controls output that the tael-server library owns directly.
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum ServerOutputMode {
40    /// Install the default tracing subscriber when possible and print the
41    /// startup banner to stdout. This is the right mode for `tael serve`.
42    Default,
43    /// Do not install a tracing subscriber and do not print the startup banner.
44    /// Existing application-level tracing subscribers may still receive Tael
45    /// events; this only prevents the library from claiming stdout/stderr on
46    /// its own.
47    Quiet,
48}
49
50/// Options for running the server process beyond listener/storage config.
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub struct ServerRunOptions {
53    pub output: ServerOutputMode,
54}
55
56impl Default for ServerRunOptions {
57    fn default() -> Self {
58        Self {
59            output: ServerOutputMode::Default,
60        }
61    }
62}
63
64impl ServerRunOptions {
65    /// Quiet options for embedding Tael inside another CLI/TUI process.
66    pub fn quiet() -> Self {
67        Self {
68            output: ServerOutputMode::Quiet,
69        }
70    }
71
72    fn is_quiet(self) -> bool {
73        matches!(self.output, ServerOutputMode::Quiet)
74    }
75}
76
77/// Periodically roll spans older than the hot-tier window into the cold tier.
78/// Runs the (blocking) compaction off the async executor. The window
79/// (`retention.traces.hot_tier`, default 24h) and interval are env-tunable
80/// (`TAEL_HOT_TIER_HOURS`, `TAEL_COMPACT_INTERVAL_SECS`) until retention config
81/// lands (Phase 7); a 0-hour window compacts everything (used in tests).
82fn spawn_span_compactor(backend: Arc<TaelBackend>, blobs: Arc<BlobStore>) {
83    let window_hours: i64 = std::env::var("TAEL_HOT_TIER_HOURS")
84        .ok()
85        .and_then(|s| s.parse().ok())
86        .unwrap_or(24);
87    let interval_secs: u64 = std::env::var("TAEL_COMPACT_INTERVAL_SECS")
88        .ok()
89        .and_then(|s| s.parse().ok())
90        .unwrap_or(3600);
91    // Span metadata retention (`retention.traces.metadata`, default 365d).
92    let retention_days: i64 = std::env::var("TAEL_TRACE_RETENTION_DAYS")
93        .ok()
94        .and_then(|s| s.parse().ok())
95        .unwrap_or(365);
96    tokio::spawn(async move {
97        let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
98        loop {
99            tick.tick().await;
100            let backend = Arc::clone(&backend);
101            let blobs = Arc::clone(&blobs);
102            let result = tokio::task::spawn_blocking(move || {
103                let now = chrono::Utc::now();
104                let hot_cutoff = now - chrono::Duration::hours(window_hours);
105                let mut compacted = backend.compact_spans(hot_cutoff)?;
106                compacted += backend.compact_logs_metrics(hot_cutoff)?;
107                let dropped =
108                    backend.enforce_span_retention(now - chrono::Duration::days(retention_days))?;
109                // Payload blob GC: drop blobs no live row references (e.g. rows
110                // just removed by retention). Runs after partition drops.
111                let live = backend.collect_live_blob_hashes()?;
112                let blobs_gcd = blobs.gc(&live)?;
113                anyhow::Ok((compacted, dropped, blobs_gcd))
114            })
115            .await;
116            match result {
117                Ok(Ok((c, d, g))) if c > 0 || d > 0 || g > 0 => tracing::info!(
118                    compacted = c,
119                    partitions_dropped = d,
120                    blobs_gcd = g,
121                    "tael-backend maintenance"
122                ),
123                Ok(Ok(_)) => {}
124                Ok(Err(e)) => tracing::warn!(error = %e, "maintenance failed"),
125                Err(e) => tracing::warn!(error = %e, "maintenance task panicked"),
126            }
127        }
128    });
129}
130
131/// Start the server with the default user-facing output behavior.
132///
133/// This is the right entrypoint for binaries such as `tael serve`: it installs a
134/// default tracing subscriber if the process has not already done so and prints
135/// a startup banner to stdout.
136pub async fn run(config: ServerConfig) -> Result<()> {
137    run_with_options(config, ServerRunOptions::default()).await
138}
139
140/// Start the server in quiet mode for in-process integrations.
141///
142/// Quiet mode avoids Tael-owned stdout/stderr setup so one-shot commands and
143/// TUIs embedding the server can preserve their own output contract.
144pub async fn run_embedded(config: ServerConfig) -> Result<()> {
145    run_with_options(config, ServerRunOptions::quiet()).await
146}
147
148/// Start the server with explicit run options.
149///
150/// Runs until both listeners receive shutdown. The configured storage backend is
151/// shared by OTLP ingest and REST query APIs, with the background maintenance
152/// task enabled when running on tael-backend.
153pub async fn run_with_options(config: ServerConfig, options: ServerRunOptions) -> Result<()> {
154    // Initialize tracing for the server process in the default CLI mode.
155    // `try_init` keeps embedding in a binary that already set a subscriber from
156    // panicking. Quiet mode leaves all tracing ownership to the host process.
157    if !options.is_quiet() {
158        let _ = tracing_subscriber::fmt()
159            .with_env_filter(EnvFilter::from_default_env())
160            .try_init();
161    }
162
163    configure_walrus_data_dir(&config.wal_dir);
164
165    let blobs = Arc::new(BlobStore::new(&config.data_dir)?);
166
167    // Cluster coordination (chitchat): automatic leader election + epoch fencing
168    // of WAL replication (§5.1). On when TAEL_CLUSTER_LISTEN is set.
169    let coordinator = match &config.cluster {
170        Some(cs) => {
171            let coord = cluster::ClusterCoordinator::start(cluster::ClusterConfig {
172                node_id: cs.node_id.clone(),
173                listen_addr: cs
174                    .listen_addr
175                    .parse()
176                    .context("parsing TAEL_CLUSTER_LISTEN")?,
177                advertise_addr: cs
178                    .advertise_addr
179                    .parse()
180                    .context("parsing TAEL_CLUSTER_ADVERTISE")?,
181                seeds: cs.seeds.clone(),
182                cluster_id: cs.cluster_id.clone(),
183            })
184            .await?;
185            Some(coord)
186        }
187        None => None,
188    };
189
190    // The payload search index is shared between the ingest path (writes) and
191    // the tael-backend query path (reads); present only when that engine runs.
192    let mut search: Option<Arc<storage::SearchIndex>> = None;
193    let store: Arc<dyn Store> = if !config.query_shards.is_empty() {
194        // Stateless query-tier mode: serve reads by scatter-gather over remote
195        // shards, no local engine (`docs/tael-server-scaling-ha.md` §3, Phase 2).
196        let shards = config
197            .query_shards
198            .iter()
199            .map(|url| RemoteStore::new(url).map(|s| Arc::new(s) as Arc<dyn Store>))
200            .collect::<Result<Vec<_>>>()?;
201        tracing::info!(
202            shards = shards.len(),
203            "query fan-out mode: reads scatter-gather across remote shards (no local engine)"
204        );
205        Arc::new(FanoutStore::new(shards)?)
206    } else {
207        match config.storage {
208            StorageBackend::Duckdb => Arc::new(DuckDbStore::new(&config.data_dir)?),
209            StorageBackend::TaelBackend => {
210                // WAL replication: when standbys are configured, this node is a
211                // leader that ships every appended record to them before acking
212                // (§5.1). With no standbys the write path is unchanged.
213                let sinks: Vec<Arc<dyn WalSink>> = config
214                    .wal_standbys
215                    .iter()
216                    .map(|url| {
217                        // Stamp the leader epoch (for standby fencing) when a
218                        // coordinator is running; otherwise ship unfenced.
219                        let sink = match &coordinator {
220                            Some(c) => RemoteWalSink::with_epoch(url, c.leader_epoch_handle()),
221                            None => RemoteWalSink::new(url),
222                        };
223                        sink.map(|s| Arc::new(s) as Arc<dyn WalSink>)
224                    })
225                    .collect::<Result<Vec<_>>>()?;
226                let backend = Arc::new(if sinks.is_empty() {
227                    TaelBackend::new(&config.data_dir)?
228                } else {
229                    tracing::info!(
230                        standbys = sinks.len(),
231                        required_acks = ?config.wal_required_acks,
232                        "WAL replication enabled: shipping to standbys (leader)"
233                    );
234                    TaelBackend::with_wal_key_and_sinks(
235                        &config.data_dir,
236                        "tael-backend",
237                        sinks,
238                        config.wal_required_acks,
239                    )?
240                });
241                search = Some(backend.search_index());
242                spawn_span_compactor(Arc::clone(&backend), Arc::clone(&blobs));
243                backend as Arc<dyn Store>
244            }
245        }
246    };
247    let bus = Arc::new(SpanBus::new()?);
248    let log_bus = Arc::new(LogBus::new()?);
249
250    tracing::info!(
251        otlp_grpc = %config.otlp_grpc_addr,
252        rest_api = %config.rest_api_addr,
253        rest_api_socket = ?config.rest_api_socket,
254        data_dir = %config.data_dir,
255        wal_dir = %config.wal_dir,
256        storage = ?config.storage,
257        "starting tael server"
258    );
259
260    let grpc_handle = tokio::spawn({
261        let store = Arc::clone(&store);
262        let blobs = Arc::clone(&blobs);
263        let bus = Arc::clone(&bus);
264        let log_bus = Arc::clone(&log_bus);
265        let addr = config.otlp_grpc_addr.parse()?;
266        async move {
267            let trace_service = ingest::otlp::OtlpTraceService::new(
268                Arc::clone(&store),
269                Arc::clone(&blobs),
270                search.clone(),
271                bus,
272            );
273            let logs_service = ingest::otlp_logs::OtlpLogsService::new(
274                Arc::clone(&store),
275                Arc::clone(&blobs),
276                log_bus,
277            );
278            let metrics_service = ingest::otlp_metrics::OtlpMetricsService::new(store);
279            TonicServer::builder()
280                .add_service(
281                    opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer::new(trace_service),
282                )
283                .add_service(
284                    opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer::new(logs_service),
285                )
286                .add_service(
287                    opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(metrics_service),
288                )
289                .serve_with_shutdown(addr, shutdown_signal())
290                .await
291                .expect("gRPC server failed");
292        }
293    });
294
295    let rest_handle = tokio::spawn({
296        let store = Arc::clone(&store);
297        let blobs = Arc::clone(&blobs);
298        let bus = Arc::clone(&bus);
299        let log_bus = Arc::clone(&log_bus);
300        let cluster = coordinator.clone();
301        let addr = config.rest_api_addr.clone();
302        let socket = config.rest_api_socket.clone();
303        async move {
304            let app = api::rest::router(store, blobs, bus, log_bus, cluster);
305            if let Some(socket) = socket {
306                #[cfg(unix)]
307                {
308                    prepare_unix_socket_path(&socket)?;
309                    let listener = tokio::net::UnixListener::bind(&socket)
310                        .with_context(|| format!("binding REST Unix socket {socket}"))?;
311                    tracing::info!(%socket, "REST API listening on Unix socket");
312                    let result = axum::serve(listener, app)
313                        .with_graceful_shutdown(shutdown_signal())
314                        .await
315                        .context("REST server failed");
316                    cleanup_unix_socket_path(&socket);
317                    result?;
318                }
319                #[cfg(not(unix))]
320                {
321                    bail!("REST Unix sockets are only supported on Unix platforms");
322                }
323            } else {
324                let listener = TcpListener::bind(&addr)
325                    .await
326                    .with_context(|| format!("binding REST addr {addr}"))?;
327                tracing::info!(%addr, "REST API listening");
328                axum::serve(listener, app)
329                    .with_graceful_shutdown(shutdown_signal())
330                    .await
331                    .context("REST server failed")?;
332            }
333            Ok::<(), anyhow::Error>(())
334        }
335    });
336
337    if !options.is_quiet() {
338        print_startup_banner(&config);
339    }
340
341    // Both listeners drain on SIGTERM/Ctrl-C; await both so in-flight requests
342    // finish before we flush and exit (`docs/tael-server-scaling-ha.md` §5.4).
343    let (grpc_res, rest_res) = tokio::join!(grpc_handle, rest_handle);
344    grpc_res?;
345    rest_res??;
346
347    // Best-effort flush so a restart/standby replays less WAL. Durability is
348    // already guaranteed by the per-write WAL fsync.
349    if let Err(e) = store.flush() {
350        tracing::warn!(error = %e, "flush on shutdown failed");
351    }
352    tracing::info!("tael server stopped");
353
354    Ok(())
355}
356
357fn configure_walrus_data_dir(wal_dir: &str) {
358    // walrus-rust currently exposes its storage root through process env only.
359    // Tael owns the server process and sets this once before opening the WAL.
360    unsafe {
361        std::env::set_var("WALRUS_DATA_DIR", wal_dir);
362    }
363}
364
365#[cfg(unix)]
366fn prepare_unix_socket_path(socket: &str) -> Result<()> {
367    use std::os::unix::fs::FileTypeExt;
368
369    let path = std::path::Path::new(socket);
370    if let Some(parent) = path.parent()
371        && !parent.as_os_str().is_empty()
372    {
373        std::fs::create_dir_all(parent)
374            .with_context(|| format!("creating REST socket directory {}", parent.display()))?;
375    }
376
377    match std::fs::symlink_metadata(path) {
378        Ok(meta) if meta.file_type().is_socket() => {
379            bail!(
380                "REST Unix socket path already exists: {}. Remove it if no server is running.",
381                path.display()
382            );
383        }
384        Ok(_) => {
385            bail!(
386                "REST Unix socket path exists and is not a socket: {}",
387                path.display()
388            );
389        }
390        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
391        Err(e) => Err(e).with_context(|| format!("checking REST socket path {}", path.display())),
392    }
393}
394
395#[cfg(unix)]
396fn cleanup_unix_socket_path(socket: &str) {
397    use std::os::unix::fs::FileTypeExt;
398
399    let path = std::path::Path::new(socket);
400    match std::fs::symlink_metadata(path) {
401        Ok(meta) if meta.file_type().is_socket() => {
402            if let Err(e) = std::fs::remove_file(path) {
403                tracing::warn!(socket = %path.display(), error = %e, "failed to remove REST Unix socket");
404            }
405        }
406        Ok(_) | Err(_) => {}
407    }
408}
409
410/// Friendly stdout banner shown on startup so a user running `tael serve`
411/// (with or without `--port`) immediately sees where to connect a CLI and
412/// where to point an OTLP exporter. Goes through `println!` so it's visible
413/// regardless of `RUST_LOG`.
414fn print_startup_banner(config: &ServerConfig) {
415    let rest = rest_endpoint_label(config);
416    let otlp = &config.otlp_grpc_addr;
417    let connect_flag = cli_connect_flag(config);
418
419    println!("tael server starting");
420    println!("  REST API     {rest}");
421    println!("  OTLP gRPC    {otlp}");
422    println!("  data dir     {}", config.data_dir);
423    println!("  WAL dir      {}", config.wal_dir);
424    println!("  storage      {:?}", config.storage);
425    println!();
426    println!("Connect a CLI from this machine:");
427    println!("  tael{connect_flag} services");
428    println!("  tael{connect_flag} live");
429    println!();
430    println!("Point a service at this server (OTLP):");
431    println!("  export OTEL_EXPORTER_OTLP_ENDPOINT=http://{otlp}");
432    println!("  export OTEL_EXPORTER_OTLP_PROTOCOL=grpc");
433    println!("  export OTEL_SERVICE_NAME=<your-service>");
434    println!();
435}
436
437/// Pick the CLI flag (if any) needed to reach this REST listener. Empty when
438/// REST is on the CLI default `127.0.0.1:7701`; `--port-rest N` when only the
439/// port differs; full `--server …` otherwise.
440fn cli_connect_flag(config: &ServerConfig) -> String {
441    if let Some(socket) = &config.rest_api_socket {
442        return format!(" --unix-socket {socket}");
443    }
444
445    let rest_addr = &config.rest_api_addr;
446    let (host, port) = match rest_addr.rsplit_once(':') {
447        Some((h, p)) => (h, p),
448        None => return String::new(),
449    };
450    let local = matches!(
451        host,
452        "127.0.0.1" | "localhost" | "0.0.0.0" | "::1" | "[::1]"
453    );
454    match (local, port) {
455        (true, "7701") => String::new(),
456        (true, p) => format!(" --port-rest {p}"),
457        (false, _) => format!(" --server http://{rest_addr}"),
458    }
459}
460
461fn rest_endpoint_label(config: &ServerConfig) -> String {
462    match &config.rest_api_socket {
463        Some(socket) => format!("unix://{socket}"),
464        None => format!("http://{}", config.rest_api_addr),
465    }
466}
467
468/// Resolve when the process is asked to stop: Ctrl-C, or SIGTERM on Unix
469/// (the orchestrator's graceful-stop signal). Both listeners await their own
470/// copy; the OS delivers the signal to every registered handler.
471async fn shutdown_signal() {
472    let ctrl_c = async {
473        let _ = tokio::signal::ctrl_c().await;
474    };
475
476    #[cfg(unix)]
477    let terminate = async {
478        match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
479            Ok(mut s) => {
480                s.recv().await;
481            }
482            Err(e) => {
483                tracing::warn!(error = %e, "failed to install SIGTERM handler");
484                std::future::pending::<()>().await;
485            }
486        }
487    };
488
489    #[cfg(not(unix))]
490    let terminate = std::future::pending::<()>();
491
492    tokio::select! {
493        _ = ctrl_c => {}
494        _ = terminate => {}
495    }
496    tracing::info!("shutdown signal received; draining listeners");
497}