Skip to main content

tael_server/
lib.rs

1//! tael-server: OTLP ingest, tiered storage, and the REST/gRPC query API.
2//!
3//! Shipped as a library so the `tael` binary can embed it behind `tael serve`
4//! (a single `cargo install`), while still being usable as a standalone crate.
5//! [`run`] is the default CLI-style entrypoint; [`run_embedded`] starts the same
6//! server in quiet mode for in-process integrations. [`ServerConfig`] configures
7//! the listeners and storage.
8
9mod api;
10mod cluster;
11mod config;
12mod ingest;
13mod log_bus;
14mod promql;
15mod span_bus;
16mod storage;
17
18use std::path::Path;
19use std::sync::Arc;
20
21use anyhow::{Context, Result, bail};
22use tokio::net::TcpListener;
23use tonic::transport::Server as TonicServer;
24use tracing_subscriber::EnvFilter;
25
26pub use config::{ServerConfig, StorageBackend};
27pub use storage::models::{
28    LogRecord, LogSeverity, MetricPoint, MetricType, Span, SpanEvent, SpanKind, SpanStatus,
29    TraceQuery,
30};
31#[cfg(feature = "duckdb")]
32pub use storage::DuckDbStore;
33pub use storage::{
34    BlobStore, FanoutStore, RemoteStore, RemoteWalSink, Store, TaelBackend, WalSink,
35};
36use storage::{StoreLocation, open_comments, open_object_backend};
37
38use log_bus::LogBus;
39use span_bus::SpanBus;
40
41/// Controls output that the tael-server library owns directly.
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ServerOutputMode {
44    /// Install the default tracing subscriber when possible and print the
45    /// startup banner to stdout. This is the right mode for `tael serve`.
46    Default,
47    /// Do not install a tracing subscriber and do not print the startup banner.
48    /// Existing application-level tracing subscribers may still receive Tael
49    /// events; this only prevents the library from claiming stdout/stderr on
50    /// its own.
51    Quiet,
52}
53
54/// Options for running the server process beyond listener/storage config.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub struct ServerRunOptions {
57    pub output: ServerOutputMode,
58}
59
60impl Default for ServerRunOptions {
61    fn default() -> Self {
62        Self {
63            output: ServerOutputMode::Default,
64        }
65    }
66}
67
68impl ServerRunOptions {
69    /// Quiet options for embedding Tael inside another CLI/TUI process.
70    pub fn quiet() -> Self {
71        Self {
72            output: ServerOutputMode::Quiet,
73        }
74    }
75
76    fn is_quiet(self) -> bool {
77        matches!(self.output, ServerOutputMode::Quiet)
78    }
79}
80
81/// Periodically roll spans older than the hot-tier window into the cold tier.
82/// Runs the (blocking) compaction off the async executor. The window
83/// (`retention.traces.hot_tier`, default 24h) and interval are env-tunable
84/// (`TAEL_HOT_TIER_HOURS`, `TAEL_COMPACT_INTERVAL_SECS`) until retention config
85/// lands (Phase 7); a 0-hour window compacts everything (used in tests).
86fn spawn_span_compactor(backend: Arc<TaelBackend>, blobs: Arc<BlobStore>, blob_gc_enabled: bool) {
87    let window_hours: i64 = std::env::var("TAEL_HOT_TIER_HOURS")
88        .ok()
89        .and_then(|s| s.parse().ok())
90        .unwrap_or(24);
91    let interval_secs: u64 = std::env::var("TAEL_COMPACT_INTERVAL_SECS")
92        .ok()
93        .and_then(|s| s.parse().ok())
94        .unwrap_or(3600);
95    // Span metadata retention (`retention.traces.metadata`, default 365d).
96    let retention_days: i64 = std::env::var("TAEL_TRACE_RETENTION_DAYS")
97        .ok()
98        .and_then(|s| s.parse().ok())
99        .unwrap_or(365);
100    tokio::spawn(async move {
101        let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
102        loop {
103            tick.tick().await;
104            let backend = Arc::clone(&backend);
105            let blobs = Arc::clone(&blobs);
106            let result = tokio::task::spawn_blocking(move || {
107                let now = chrono::Utc::now();
108                let hot_cutoff = now - chrono::Duration::hours(window_hours);
109                let mut compacted = backend.compact_spans(hot_cutoff)?;
110                compacted += backend.compact_logs_metrics(hot_cutoff)?;
111                let dropped =
112                    backend.enforce_span_retention(now - chrono::Duration::days(retention_days))?;
113                // Payload blob GC: drop blobs no live row references (e.g. rows
114                // just removed by retention). Runs after partition drops. Skipped
115                // when this node doesn't own GC over a shared blob store (the
116                // single-owner guard), to avoid deleting blobs other shards
117                // reference.
118                let blobs_gcd = if blob_gc_enabled {
119                    let live = backend.collect_live_blob_hashes()?;
120                    blobs.gc(&live)?
121                } else {
122                    0
123                };
124                anyhow::Ok((compacted, dropped, blobs_gcd))
125            })
126            .await;
127            match result {
128                Ok(Ok((c, d, g))) if c > 0 || d > 0 || g > 0 => tracing::info!(
129                    compacted = c,
130                    partitions_dropped = d,
131                    blobs_gcd = g,
132                    "tael-backend maintenance"
133                ),
134                Ok(Ok(_)) => {}
135                Ok(Err(e)) => tracing::warn!(error = %e, "maintenance failed"),
136                Err(e) => tracing::warn!(error = %e, "maintenance task panicked"),
137            }
138        }
139    });
140}
141
142/// Start the server with the default user-facing output behavior.
143///
144/// This is the right entrypoint for binaries such as `tael serve`: it installs a
145/// default tracing subscriber if the process has not already done so and prints
146/// a startup banner to stdout.
147pub async fn run(config: ServerConfig) -> Result<()> {
148    run_with_options(config, ServerRunOptions::default()).await
149}
150
151/// Start the server in quiet mode for in-process integrations.
152///
153/// Quiet mode avoids Tael-owned stdout/stderr setup so one-shot commands and
154/// TUIs embedding the server can preserve their own output contract.
155pub async fn run_embedded(config: ServerConfig) -> Result<()> {
156    run_with_options(config, ServerRunOptions::quiet()).await
157}
158
159/// Start the server with explicit run options.
160///
161/// Runs until both listeners receive shutdown. The configured storage backend is
162/// shared by OTLP ingest and REST query APIs, with the background maintenance
163/// task enabled when running on tael-backend.
164pub async fn run_with_options(config: ServerConfig, options: ServerRunOptions) -> Result<()> {
165    // Initialize tracing for the server process in the default CLI mode.
166    // `try_init` keeps embedding in a binary that already set a subscriber from
167    // panicking. Quiet mode leaves all tracing ownership to the host process.
168    if !options.is_quiet() {
169        let _ = tracing_subscriber::fmt()
170            .with_env_filter(EnvFilter::from_default_env())
171            .try_init();
172    }
173
174    configure_walrus_data_dir(&config.wal_dir);
175
176    // Blob store: local filesystem by default; GCS when configured (opt-in,
177    // requires the `cloud` feature — otherwise this fails loudly).
178    let blobs = Arc::new(match config.object_store.blobs {
179        StoreLocation::Fs => BlobStore::new(&config.data_dir)?,
180        StoreLocation::Gcs => {
181            let backend = open_object_backend(
182                StoreLocation::Gcs,
183                Path::new(&config.data_dir).join("blobs").as_path(),
184                config.object_store.blob_bucket.as_deref(),
185            )?;
186            BlobStore::with_backend(backend)?
187        }
188    });
189
190    // Cluster coordination (chitchat): automatic leader election + epoch fencing
191    // of WAL replication (§5.1). On when TAEL_CLUSTER_LISTEN is set.
192    let coordinator = match &config.cluster {
193        Some(cs) => {
194            let coord = cluster::ClusterCoordinator::start(cluster::ClusterConfig {
195                node_id: cs.node_id.clone(),
196                listen_addr: cs
197                    .listen_addr
198                    .parse()
199                    .context("parsing TAEL_CLUSTER_LISTEN")?,
200                advertise_addr: cs
201                    .advertise_addr
202                    .parse()
203                    .context("parsing TAEL_CLUSTER_ADVERTISE")?,
204                seeds: cs.seeds.clone(),
205                cluster_id: cs.cluster_id.clone(),
206            })
207            .await?;
208            Some(coord)
209        }
210        None => None,
211    };
212
213    // The payload search index is shared between the ingest path (writes) and
214    // the tael-backend query path (reads); present only when that engine runs.
215    let mut search: Option<Arc<storage::SearchIndex>> = None;
216    let store: Arc<dyn Store> = if !config.query_shards.is_empty() {
217        // Stateless query-tier mode: serve reads by scatter-gather over remote
218        // shards, no local engine (`docs/tael-server-scaling-ha.md` §3, Phase 2).
219        let shards = config
220            .query_shards
221            .iter()
222            .map(|url| RemoteStore::new(url).map(|s| Arc::new(s) as Arc<dyn Store>))
223            .collect::<Result<Vec<_>>>()?;
224        tracing::info!(
225            shards = shards.len(),
226            "query fan-out mode: reads scatter-gather across remote shards (no local engine)"
227        );
228        Arc::new(FanoutStore::new(shards)?)
229    } else {
230        match config.storage {
231            #[cfg(feature = "duckdb")]
232            StorageBackend::Duckdb => Arc::new(DuckDbStore::new(&config.data_dir)?),
233            #[cfg(not(feature = "duckdb"))]
234            StorageBackend::Duckdb => {
235                bail!(
236                    "DuckDB storage is not included in this build; reinstall with `--features duckdb` to use --storage duckdb"
237                )
238            }
239            StorageBackend::TaelBackend => {
240                // WAL replication: when standbys are configured, this node is a
241                // leader that ships every appended record to them before acking
242                // (§5.1). With no standbys the write path is unchanged.
243                let sinks: Vec<Arc<dyn WalSink>> = config
244                    .wal_standbys
245                    .iter()
246                    .map(|url| {
247                        // Stamp the leader epoch (for standby fencing) when a
248                        // coordinator is running; otherwise ship unfenced.
249                        let sink = match &coordinator {
250                            Some(c) => RemoteWalSink::with_epoch(url, c.leader_epoch_handle()),
251                            None => RemoteWalSink::new(url),
252                        };
253                        sink.map(|s| Arc::new(s) as Arc<dyn WalSink>)
254                    })
255                    .collect::<Result<Vec<_>>>()?;
256                if !sinks.is_empty() {
257                    tracing::info!(
258                        standbys = sinks.len(),
259                        required_acks = ?config.wal_required_acks,
260                        "WAL replication enabled: shipping to standbys (leader)"
261                    );
262                }
263                // Cold tier: local filesystem by default; GCS when configured
264                // (opt-in, requires the `cloud` feature).
265                let cold_backend = match config.object_store.cold {
266                    StoreLocation::Fs => None,
267                    StoreLocation::Gcs => Some(open_object_backend(
268                        StoreLocation::Gcs,
269                        Path::new(&config.data_dir).join("cold").as_path(),
270                        config.object_store.cold_bucket.as_deref(),
271                    )?),
272                };
273                // Comments: local JSONL by default; Postgres when configured.
274                let comments = open_comments(&config.comments, &config.data_dir)?;
275                let backend = Arc::new(TaelBackend::with_components(
276                    &config.data_dir,
277                    "tael-backend",
278                    sinks,
279                    config.wal_required_acks,
280                    cold_backend,
281                    comments,
282                )?);
283                search = Some(backend.search_index());
284                // Blob GC single-owner guard: on a shared (GCS) blob store,
285                // per-node mark-and-sweep would delete blobs other shards still
286                // reference, so it only runs on the designated coordinator. On a
287                // node-local store every node owns its own blobs and GCs freely.
288                let blob_gc_enabled = !config.object_store.blobs_shared()
289                    || config.object_store.blob_gc_coordinator;
290                if !blob_gc_enabled {
291                    tracing::info!(
292                        "blob GC disabled on this node: shared blob store, not the GC coordinator \
293                         (set TAEL_BLOB_GC_ROLE=coordinator on exactly one node)"
294                    );
295                }
296                spawn_span_compactor(Arc::clone(&backend), Arc::clone(&blobs), blob_gc_enabled);
297                backend as Arc<dyn Store>
298            }
299        }
300    };
301    let bus = Arc::new(SpanBus::new()?);
302    let log_bus = Arc::new(LogBus::new()?);
303
304    tracing::info!(
305        otlp_grpc = %config.otlp_grpc_addr,
306        rest_api = %config.rest_api_addr,
307        rest_api_socket = ?config.rest_api_socket,
308        data_dir = %config.data_dir,
309        wal_dir = %config.wal_dir,
310        storage = ?config.storage,
311        "starting tael server"
312    );
313
314    let grpc_handle = tokio::spawn({
315        let store = Arc::clone(&store);
316        let blobs = Arc::clone(&blobs);
317        let bus = Arc::clone(&bus);
318        let log_bus = Arc::clone(&log_bus);
319        let addr = config.otlp_grpc_addr.parse()?;
320        async move {
321            let trace_service = ingest::otlp::OtlpTraceService::new(
322                Arc::clone(&store),
323                Arc::clone(&blobs),
324                search.clone(),
325                bus,
326            );
327            let logs_service = ingest::otlp_logs::OtlpLogsService::new(
328                Arc::clone(&store),
329                Arc::clone(&blobs),
330                log_bus,
331            );
332            let metrics_service = ingest::otlp_metrics::OtlpMetricsService::new(store);
333            TonicServer::builder()
334                .add_service(
335                    opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer::new(trace_service),
336                )
337                .add_service(
338                    opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer::new(logs_service),
339                )
340                .add_service(
341                    opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(metrics_service),
342                )
343                .serve_with_shutdown(addr, shutdown_signal())
344                .await
345                .expect("gRPC server failed");
346        }
347    });
348
349    let rest_handle = tokio::spawn({
350        let store = Arc::clone(&store);
351        let blobs = Arc::clone(&blobs);
352        let bus = Arc::clone(&bus);
353        let log_bus = Arc::clone(&log_bus);
354        let cluster = coordinator.clone();
355        let addr = config.rest_api_addr.clone();
356        let socket = config.rest_api_socket.clone();
357        async move {
358            let app = api::rest::router(store, blobs, bus, log_bus, cluster);
359            if let Some(socket) = socket {
360                #[cfg(unix)]
361                {
362                    prepare_unix_socket_path(&socket)?;
363                    let listener = tokio::net::UnixListener::bind(&socket)
364                        .with_context(|| format!("binding REST Unix socket {socket}"))?;
365                    tracing::info!(%socket, "REST API listening on Unix socket");
366                    let result = axum::serve(listener, app)
367                        .with_graceful_shutdown(shutdown_signal())
368                        .await
369                        .context("REST server failed");
370                    cleanup_unix_socket_path(&socket);
371                    result?;
372                }
373                #[cfg(not(unix))]
374                {
375                    bail!("REST Unix sockets are only supported on Unix platforms");
376                }
377            } else {
378                let listener = TcpListener::bind(&addr)
379                    .await
380                    .with_context(|| format!("binding REST addr {addr}"))?;
381                tracing::info!(%addr, "REST API listening");
382                axum::serve(listener, app)
383                    .with_graceful_shutdown(shutdown_signal())
384                    .await
385                    .context("REST server failed")?;
386            }
387            Ok::<(), anyhow::Error>(())
388        }
389    });
390
391    if !options.is_quiet() {
392        print_startup_banner(&config);
393    }
394
395    // Both listeners drain on SIGTERM/Ctrl-C; await both so in-flight requests
396    // finish before we flush and exit (`docs/tael-server-scaling-ha.md` §5.4).
397    let (grpc_res, rest_res) = tokio::join!(grpc_handle, rest_handle);
398    grpc_res?;
399    rest_res??;
400
401    // Best-effort flush so a restart/standby replays less WAL. Durability is
402    // already guaranteed by the per-write WAL fsync.
403    if let Err(e) = store.flush() {
404        tracing::warn!(error = %e, "flush on shutdown failed");
405    }
406    tracing::info!("tael server stopped");
407
408    Ok(())
409}
410
411fn configure_walrus_data_dir(wal_dir: &str) {
412    // walrus-rust currently exposes its storage root through process env only.
413    // Tael owns the server process and sets this once before opening the WAL.
414    unsafe {
415        std::env::set_var("WALRUS_DATA_DIR", wal_dir);
416    }
417}
418
419#[cfg(unix)]
420fn prepare_unix_socket_path(socket: &str) -> Result<()> {
421    use std::os::unix::fs::FileTypeExt;
422
423    let path = std::path::Path::new(socket);
424    if let Some(parent) = path.parent()
425        && !parent.as_os_str().is_empty()
426    {
427        std::fs::create_dir_all(parent)
428            .with_context(|| format!("creating REST socket directory {}", parent.display()))?;
429    }
430
431    match std::fs::symlink_metadata(path) {
432        Ok(meta) if meta.file_type().is_socket() => {
433            bail!(
434                "REST Unix socket path already exists: {}. Remove it if no server is running.",
435                path.display()
436            );
437        }
438        Ok(_) => {
439            bail!(
440                "REST Unix socket path exists and is not a socket: {}",
441                path.display()
442            );
443        }
444        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
445        Err(e) => Err(e).with_context(|| format!("checking REST socket path {}", path.display())),
446    }
447}
448
449#[cfg(unix)]
450fn cleanup_unix_socket_path(socket: &str) {
451    use std::os::unix::fs::FileTypeExt;
452
453    let path = std::path::Path::new(socket);
454    match std::fs::symlink_metadata(path) {
455        Ok(meta) if meta.file_type().is_socket() => {
456            if let Err(e) = std::fs::remove_file(path) {
457                tracing::warn!(socket = %path.display(), error = %e, "failed to remove REST Unix socket");
458            }
459        }
460        Ok(_) | Err(_) => {}
461    }
462}
463
464/// Friendly stdout banner shown on startup so a user running `tael serve`
465/// (with or without `--port`) immediately sees where to connect a CLI and
466/// where to point an OTLP exporter. Goes through `println!` so it's visible
467/// regardless of `RUST_LOG`.
468fn print_startup_banner(config: &ServerConfig) {
469    let rest = rest_endpoint_label(config);
470    let otlp = &config.otlp_grpc_addr;
471    let connect_flag = cli_connect_flag(config);
472
473    println!("tael server starting");
474    println!("  REST API     {rest}");
475    println!("  OTLP gRPC    {otlp}");
476    println!("  data dir     {}", config.data_dir);
477    println!("  WAL dir      {}", config.wal_dir);
478    println!("  storage      {:?}", config.storage);
479    println!();
480    println!("Connect a CLI from this machine:");
481    println!("  tael{connect_flag} services");
482    println!("  tael{connect_flag} live");
483    println!();
484    println!("Point a service at this server (OTLP):");
485    println!("  export OTEL_EXPORTER_OTLP_ENDPOINT=http://{otlp}");
486    println!("  export OTEL_EXPORTER_OTLP_PROTOCOL=grpc");
487    println!("  export OTEL_SERVICE_NAME=<your-service>");
488    println!();
489}
490
491/// Pick the CLI flag (if any) needed to reach this REST listener. Empty when
492/// REST is on the CLI default `127.0.0.1:7701`; `--port-rest N` when only the
493/// port differs; full `--server …` otherwise.
494fn cli_connect_flag(config: &ServerConfig) -> String {
495    if let Some(socket) = &config.rest_api_socket {
496        return format!(" --unix-socket {socket}");
497    }
498
499    let rest_addr = &config.rest_api_addr;
500    let (host, port) = match rest_addr.rsplit_once(':') {
501        Some((h, p)) => (h, p),
502        None => return String::new(),
503    };
504    let local = matches!(
505        host,
506        "127.0.0.1" | "localhost" | "0.0.0.0" | "::1" | "[::1]"
507    );
508    match (local, port) {
509        (true, "7701") => String::new(),
510        (true, p) => format!(" --port-rest {p}"),
511        (false, _) => format!(" --server http://{rest_addr}"),
512    }
513}
514
515fn rest_endpoint_label(config: &ServerConfig) -> String {
516    match &config.rest_api_socket {
517        Some(socket) => format!("unix://{socket}"),
518        None => format!("http://{}", config.rest_api_addr),
519    }
520}
521
522/// Resolve when the process is asked to stop: Ctrl-C, or SIGTERM on Unix
523/// (the orchestrator's graceful-stop signal). Both listeners await their own
524/// copy; the OS delivers the signal to every registered handler.
525async fn shutdown_signal() {
526    let ctrl_c = async {
527        let _ = tokio::signal::ctrl_c().await;
528    };
529
530    #[cfg(unix)]
531    let terminate = async {
532        match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
533            Ok(mut s) => {
534                s.recv().await;
535            }
536            Err(e) => {
537                tracing::warn!(error = %e, "failed to install SIGTERM handler");
538                std::future::pending::<()>().await;
539            }
540        }
541    };
542
543    #[cfg(not(unix))]
544    let terminate = std::future::pending::<()>();
545
546    tokio::select! {
547        _ = ctrl_c => {}
548        _ = terminate => {}
549    }
550    tracing::info!("shutdown signal received; draining listeners");
551}