Skip to main content

tael_server/
lib.rs

1//! tael-server: OTLP ingest, tiered storage, and the REST/gRPC query API.
2//!
3//! Shipped as a library so the `tael` binary can embed it behind `tael serve`
4//! (a single `cargo install`), while still being usable as a standalone crate.
5//! [`run`] is the default CLI-style entrypoint; [`run_embedded`] starts the same
6//! server in quiet mode for in-process integrations. [`ServerConfig`] configures
7//! the listeners and storage.
8
9mod api;
10mod cluster;
11mod config;
12mod ingest;
13mod log_bus;
14mod promql;
15mod span_bus;
16mod storage;
17
18use std::sync::Arc;
19
20use anyhow::{Context, Result, bail};
21use tokio::net::TcpListener;
22use tonic::transport::Server as TonicServer;
23use tracing_subscriber::EnvFilter;
24
25pub use config::{ServerConfig, StorageBackend};
26pub use storage::models::{
27    LogRecord, LogSeverity, MetricPoint, MetricType, Span, SpanEvent, SpanKind, SpanStatus,
28    TraceQuery,
29};
30#[cfg(feature = "duckdb")]
31pub use storage::DuckDbStore;
32pub use storage::{
33    BlobStore, FanoutStore, RemoteStore, RemoteWalSink, Store, TaelBackend, WalSink,
34};
35
36use log_bus::LogBus;
37use span_bus::SpanBus;
38
39/// Controls output that the tael-server library owns directly.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum ServerOutputMode {
42    /// Install the default tracing subscriber when possible and print the
43    /// startup banner to stdout. This is the right mode for `tael serve`.
44    Default,
45    /// Do not install a tracing subscriber and do not print the startup banner.
46    /// Existing application-level tracing subscribers may still receive Tael
47    /// events; this only prevents the library from claiming stdout/stderr on
48    /// its own.
49    Quiet,
50}
51
52/// Options for running the server process beyond listener/storage config.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub struct ServerRunOptions {
55    pub output: ServerOutputMode,
56}
57
58impl Default for ServerRunOptions {
59    fn default() -> Self {
60        Self {
61            output: ServerOutputMode::Default,
62        }
63    }
64}
65
66impl ServerRunOptions {
67    /// Quiet options for embedding Tael inside another CLI/TUI process.
68    pub fn quiet() -> Self {
69        Self {
70            output: ServerOutputMode::Quiet,
71        }
72    }
73
74    fn is_quiet(self) -> bool {
75        matches!(self.output, ServerOutputMode::Quiet)
76    }
77}
78
79/// Periodically roll spans older than the hot-tier window into the cold tier.
80/// Runs the (blocking) compaction off the async executor. The window
81/// (`retention.traces.hot_tier`, default 24h) and interval are env-tunable
82/// (`TAEL_HOT_TIER_HOURS`, `TAEL_COMPACT_INTERVAL_SECS`) until retention config
83/// lands (Phase 7); a 0-hour window compacts everything (used in tests).
84fn spawn_span_compactor(backend: Arc<TaelBackend>, blobs: Arc<BlobStore>) {
85    let window_hours: i64 = std::env::var("TAEL_HOT_TIER_HOURS")
86        .ok()
87        .and_then(|s| s.parse().ok())
88        .unwrap_or(24);
89    let interval_secs: u64 = std::env::var("TAEL_COMPACT_INTERVAL_SECS")
90        .ok()
91        .and_then(|s| s.parse().ok())
92        .unwrap_or(3600);
93    // Span metadata retention (`retention.traces.metadata`, default 365d).
94    let retention_days: i64 = std::env::var("TAEL_TRACE_RETENTION_DAYS")
95        .ok()
96        .and_then(|s| s.parse().ok())
97        .unwrap_or(365);
98    tokio::spawn(async move {
99        let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
100        loop {
101            tick.tick().await;
102            let backend = Arc::clone(&backend);
103            let blobs = Arc::clone(&blobs);
104            let result = tokio::task::spawn_blocking(move || {
105                let now = chrono::Utc::now();
106                let hot_cutoff = now - chrono::Duration::hours(window_hours);
107                let mut compacted = backend.compact_spans(hot_cutoff)?;
108                compacted += backend.compact_logs_metrics(hot_cutoff)?;
109                let dropped =
110                    backend.enforce_span_retention(now - chrono::Duration::days(retention_days))?;
111                // Payload blob GC: drop blobs no live row references (e.g. rows
112                // just removed by retention). Runs after partition drops.
113                let live = backend.collect_live_blob_hashes()?;
114                let blobs_gcd = blobs.gc(&live)?;
115                anyhow::Ok((compacted, dropped, blobs_gcd))
116            })
117            .await;
118            match result {
119                Ok(Ok((c, d, g))) if c > 0 || d > 0 || g > 0 => tracing::info!(
120                    compacted = c,
121                    partitions_dropped = d,
122                    blobs_gcd = g,
123                    "tael-backend maintenance"
124                ),
125                Ok(Ok(_)) => {}
126                Ok(Err(e)) => tracing::warn!(error = %e, "maintenance failed"),
127                Err(e) => tracing::warn!(error = %e, "maintenance task panicked"),
128            }
129        }
130    });
131}
132
133/// Start the server with the default user-facing output behavior.
134///
135/// This is the right entrypoint for binaries such as `tael serve`: it installs a
136/// default tracing subscriber if the process has not already done so and prints
137/// a startup banner to stdout.
138pub async fn run(config: ServerConfig) -> Result<()> {
139    run_with_options(config, ServerRunOptions::default()).await
140}
141
142/// Start the server in quiet mode for in-process integrations.
143///
144/// Quiet mode avoids Tael-owned stdout/stderr setup so one-shot commands and
145/// TUIs embedding the server can preserve their own output contract.
146pub async fn run_embedded(config: ServerConfig) -> Result<()> {
147    run_with_options(config, ServerRunOptions::quiet()).await
148}
149
150/// Start the server with explicit run options.
151///
152/// Runs until both listeners receive shutdown. The configured storage backend is
153/// shared by OTLP ingest and REST query APIs, with the background maintenance
154/// task enabled when running on tael-backend.
155pub async fn run_with_options(config: ServerConfig, options: ServerRunOptions) -> Result<()> {
156    // Initialize tracing for the server process in the default CLI mode.
157    // `try_init` keeps embedding in a binary that already set a subscriber from
158    // panicking. Quiet mode leaves all tracing ownership to the host process.
159    if !options.is_quiet() {
160        let _ = tracing_subscriber::fmt()
161            .with_env_filter(EnvFilter::from_default_env())
162            .try_init();
163    }
164
165    configure_walrus_data_dir(&config.wal_dir);
166
167    let blobs = Arc::new(BlobStore::new(&config.data_dir)?);
168
169    // Cluster coordination (chitchat): automatic leader election + epoch fencing
170    // of WAL replication (§5.1). On when TAEL_CLUSTER_LISTEN is set.
171    let coordinator = match &config.cluster {
172        Some(cs) => {
173            let coord = cluster::ClusterCoordinator::start(cluster::ClusterConfig {
174                node_id: cs.node_id.clone(),
175                listen_addr: cs
176                    .listen_addr
177                    .parse()
178                    .context("parsing TAEL_CLUSTER_LISTEN")?,
179                advertise_addr: cs
180                    .advertise_addr
181                    .parse()
182                    .context("parsing TAEL_CLUSTER_ADVERTISE")?,
183                seeds: cs.seeds.clone(),
184                cluster_id: cs.cluster_id.clone(),
185            })
186            .await?;
187            Some(coord)
188        }
189        None => None,
190    };
191
192    // The payload search index is shared between the ingest path (writes) and
193    // the tael-backend query path (reads); present only when that engine runs.
194    let mut search: Option<Arc<storage::SearchIndex>> = None;
195    let store: Arc<dyn Store> = if !config.query_shards.is_empty() {
196        // Stateless query-tier mode: serve reads by scatter-gather over remote
197        // shards, no local engine (`docs/tael-server-scaling-ha.md` §3, Phase 2).
198        let shards = config
199            .query_shards
200            .iter()
201            .map(|url| RemoteStore::new(url).map(|s| Arc::new(s) as Arc<dyn Store>))
202            .collect::<Result<Vec<_>>>()?;
203        tracing::info!(
204            shards = shards.len(),
205            "query fan-out mode: reads scatter-gather across remote shards (no local engine)"
206        );
207        Arc::new(FanoutStore::new(shards)?)
208    } else {
209        match config.storage {
210            #[cfg(feature = "duckdb")]
211            StorageBackend::Duckdb => Arc::new(DuckDbStore::new(&config.data_dir)?),
212            #[cfg(not(feature = "duckdb"))]
213            StorageBackend::Duckdb => {
214                bail!(
215                    "DuckDB storage is not included in this build; reinstall with `--features duckdb` to use --storage duckdb"
216                )
217            }
218            StorageBackend::TaelBackend => {
219                // WAL replication: when standbys are configured, this node is a
220                // leader that ships every appended record to them before acking
221                // (§5.1). With no standbys the write path is unchanged.
222                let sinks: Vec<Arc<dyn WalSink>> = config
223                    .wal_standbys
224                    .iter()
225                    .map(|url| {
226                        // Stamp the leader epoch (for standby fencing) when a
227                        // coordinator is running; otherwise ship unfenced.
228                        let sink = match &coordinator {
229                            Some(c) => RemoteWalSink::with_epoch(url, c.leader_epoch_handle()),
230                            None => RemoteWalSink::new(url),
231                        };
232                        sink.map(|s| Arc::new(s) as Arc<dyn WalSink>)
233                    })
234                    .collect::<Result<Vec<_>>>()?;
235                let backend = Arc::new(if sinks.is_empty() {
236                    TaelBackend::new(&config.data_dir)?
237                } else {
238                    tracing::info!(
239                        standbys = sinks.len(),
240                        required_acks = ?config.wal_required_acks,
241                        "WAL replication enabled: shipping to standbys (leader)"
242                    );
243                    TaelBackend::with_wal_key_and_sinks(
244                        &config.data_dir,
245                        "tael-backend",
246                        sinks,
247                        config.wal_required_acks,
248                    )?
249                });
250                search = Some(backend.search_index());
251                spawn_span_compactor(Arc::clone(&backend), Arc::clone(&blobs));
252                backend as Arc<dyn Store>
253            }
254        }
255    };
256    let bus = Arc::new(SpanBus::new()?);
257    let log_bus = Arc::new(LogBus::new()?);
258
259    tracing::info!(
260        otlp_grpc = %config.otlp_grpc_addr,
261        rest_api = %config.rest_api_addr,
262        rest_api_socket = ?config.rest_api_socket,
263        data_dir = %config.data_dir,
264        wal_dir = %config.wal_dir,
265        storage = ?config.storage,
266        "starting tael server"
267    );
268
269    let grpc_handle = tokio::spawn({
270        let store = Arc::clone(&store);
271        let blobs = Arc::clone(&blobs);
272        let bus = Arc::clone(&bus);
273        let log_bus = Arc::clone(&log_bus);
274        let addr = config.otlp_grpc_addr.parse()?;
275        async move {
276            let trace_service = ingest::otlp::OtlpTraceService::new(
277                Arc::clone(&store),
278                Arc::clone(&blobs),
279                search.clone(),
280                bus,
281            );
282            let logs_service = ingest::otlp_logs::OtlpLogsService::new(
283                Arc::clone(&store),
284                Arc::clone(&blobs),
285                log_bus,
286            );
287            let metrics_service = ingest::otlp_metrics::OtlpMetricsService::new(store);
288            TonicServer::builder()
289                .add_service(
290                    opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer::new(trace_service),
291                )
292                .add_service(
293                    opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer::new(logs_service),
294                )
295                .add_service(
296                    opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(metrics_service),
297                )
298                .serve_with_shutdown(addr, shutdown_signal())
299                .await
300                .expect("gRPC server failed");
301        }
302    });
303
304    let rest_handle = tokio::spawn({
305        let store = Arc::clone(&store);
306        let blobs = Arc::clone(&blobs);
307        let bus = Arc::clone(&bus);
308        let log_bus = Arc::clone(&log_bus);
309        let cluster = coordinator.clone();
310        let addr = config.rest_api_addr.clone();
311        let socket = config.rest_api_socket.clone();
312        async move {
313            let app = api::rest::router(store, blobs, bus, log_bus, cluster);
314            if let Some(socket) = socket {
315                #[cfg(unix)]
316                {
317                    prepare_unix_socket_path(&socket)?;
318                    let listener = tokio::net::UnixListener::bind(&socket)
319                        .with_context(|| format!("binding REST Unix socket {socket}"))?;
320                    tracing::info!(%socket, "REST API listening on Unix socket");
321                    let result = axum::serve(listener, app)
322                        .with_graceful_shutdown(shutdown_signal())
323                        .await
324                        .context("REST server failed");
325                    cleanup_unix_socket_path(&socket);
326                    result?;
327                }
328                #[cfg(not(unix))]
329                {
330                    bail!("REST Unix sockets are only supported on Unix platforms");
331                }
332            } else {
333                let listener = TcpListener::bind(&addr)
334                    .await
335                    .with_context(|| format!("binding REST addr {addr}"))?;
336                tracing::info!(%addr, "REST API listening");
337                axum::serve(listener, app)
338                    .with_graceful_shutdown(shutdown_signal())
339                    .await
340                    .context("REST server failed")?;
341            }
342            Ok::<(), anyhow::Error>(())
343        }
344    });
345
346    if !options.is_quiet() {
347        print_startup_banner(&config);
348    }
349
350    // Both listeners drain on SIGTERM/Ctrl-C; await both so in-flight requests
351    // finish before we flush and exit (`docs/tael-server-scaling-ha.md` §5.4).
352    let (grpc_res, rest_res) = tokio::join!(grpc_handle, rest_handle);
353    grpc_res?;
354    rest_res??;
355
356    // Best-effort flush so a restart/standby replays less WAL. Durability is
357    // already guaranteed by the per-write WAL fsync.
358    if let Err(e) = store.flush() {
359        tracing::warn!(error = %e, "flush on shutdown failed");
360    }
361    tracing::info!("tael server stopped");
362
363    Ok(())
364}
365
366fn configure_walrus_data_dir(wal_dir: &str) {
367    // walrus-rust currently exposes its storage root through process env only.
368    // Tael owns the server process and sets this once before opening the WAL.
369    unsafe {
370        std::env::set_var("WALRUS_DATA_DIR", wal_dir);
371    }
372}
373
374#[cfg(unix)]
375fn prepare_unix_socket_path(socket: &str) -> Result<()> {
376    use std::os::unix::fs::FileTypeExt;
377
378    let path = std::path::Path::new(socket);
379    if let Some(parent) = path.parent()
380        && !parent.as_os_str().is_empty()
381    {
382        std::fs::create_dir_all(parent)
383            .with_context(|| format!("creating REST socket directory {}", parent.display()))?;
384    }
385
386    match std::fs::symlink_metadata(path) {
387        Ok(meta) if meta.file_type().is_socket() => {
388            bail!(
389                "REST Unix socket path already exists: {}. Remove it if no server is running.",
390                path.display()
391            );
392        }
393        Ok(_) => {
394            bail!(
395                "REST Unix socket path exists and is not a socket: {}",
396                path.display()
397            );
398        }
399        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
400        Err(e) => Err(e).with_context(|| format!("checking REST socket path {}", path.display())),
401    }
402}
403
404#[cfg(unix)]
405fn cleanup_unix_socket_path(socket: &str) {
406    use std::os::unix::fs::FileTypeExt;
407
408    let path = std::path::Path::new(socket);
409    match std::fs::symlink_metadata(path) {
410        Ok(meta) if meta.file_type().is_socket() => {
411            if let Err(e) = std::fs::remove_file(path) {
412                tracing::warn!(socket = %path.display(), error = %e, "failed to remove REST Unix socket");
413            }
414        }
415        Ok(_) | Err(_) => {}
416    }
417}
418
419/// Friendly stdout banner shown on startup so a user running `tael serve`
420/// (with or without `--port`) immediately sees where to connect a CLI and
421/// where to point an OTLP exporter. Goes through `println!` so it's visible
422/// regardless of `RUST_LOG`.
423fn print_startup_banner(config: &ServerConfig) {
424    let rest = rest_endpoint_label(config);
425    let otlp = &config.otlp_grpc_addr;
426    let connect_flag = cli_connect_flag(config);
427
428    println!("tael server starting");
429    println!("  REST API     {rest}");
430    println!("  OTLP gRPC    {otlp}");
431    println!("  data dir     {}", config.data_dir);
432    println!("  WAL dir      {}", config.wal_dir);
433    println!("  storage      {:?}", config.storage);
434    println!();
435    println!("Connect a CLI from this machine:");
436    println!("  tael{connect_flag} services");
437    println!("  tael{connect_flag} live");
438    println!();
439    println!("Point a service at this server (OTLP):");
440    println!("  export OTEL_EXPORTER_OTLP_ENDPOINT=http://{otlp}");
441    println!("  export OTEL_EXPORTER_OTLP_PROTOCOL=grpc");
442    println!("  export OTEL_SERVICE_NAME=<your-service>");
443    println!();
444}
445
446/// Pick the CLI flag (if any) needed to reach this REST listener. Empty when
447/// REST is on the CLI default `127.0.0.1:7701`; `--port-rest N` when only the
448/// port differs; full `--server …` otherwise.
449fn cli_connect_flag(config: &ServerConfig) -> String {
450    if let Some(socket) = &config.rest_api_socket {
451        return format!(" --unix-socket {socket}");
452    }
453
454    let rest_addr = &config.rest_api_addr;
455    let (host, port) = match rest_addr.rsplit_once(':') {
456        Some((h, p)) => (h, p),
457        None => return String::new(),
458    };
459    let local = matches!(
460        host,
461        "127.0.0.1" | "localhost" | "0.0.0.0" | "::1" | "[::1]"
462    );
463    match (local, port) {
464        (true, "7701") => String::new(),
465        (true, p) => format!(" --port-rest {p}"),
466        (false, _) => format!(" --server http://{rest_addr}"),
467    }
468}
469
470fn rest_endpoint_label(config: &ServerConfig) -> String {
471    match &config.rest_api_socket {
472        Some(socket) => format!("unix://{socket}"),
473        None => format!("http://{}", config.rest_api_addr),
474    }
475}
476
477/// Resolve when the process is asked to stop: Ctrl-C, or SIGTERM on Unix
478/// (the orchestrator's graceful-stop signal). Both listeners await their own
479/// copy; the OS delivers the signal to every registered handler.
480async fn shutdown_signal() {
481    let ctrl_c = async {
482        let _ = tokio::signal::ctrl_c().await;
483    };
484
485    #[cfg(unix)]
486    let terminate = async {
487        match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
488            Ok(mut s) => {
489                s.recv().await;
490            }
491            Err(e) => {
492                tracing::warn!(error = %e, "failed to install SIGTERM handler");
493                std::future::pending::<()>().await;
494            }
495        }
496    };
497
498    #[cfg(not(unix))]
499    let terminate = std::future::pending::<()>();
500
501    tokio::select! {
502        _ = ctrl_c => {}
503        _ = terminate => {}
504    }
505    tracing::info!("shutdown signal received; draining listeners");
506}