mod api;
mod cluster;
mod config;
mod ingest;
mod log_bus;
mod promql;
mod span_bus;
mod storage;
use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, Result, bail};
use tokio::net::TcpListener;
use tonic::transport::Server as TonicServer;
use tracing_subscriber::EnvFilter;
pub use config::{ServerConfig, StorageBackend};
pub use storage::models::{
LogRecord, LogSeverity, MetricPoint, MetricType, Span, SpanEvent, SpanKind, SpanStatus,
TraceQuery,
};
#[cfg(feature = "duckdb")]
pub use storage::DuckDbStore;
pub use storage::{
BlobStore, FanoutStore, RemoteStore, RemoteWalSink, Store, TaelBackend, WalSink,
};
use storage::{StoreLocation, open_comments, open_object_backend};
use log_bus::LogBus;
use span_bus::SpanBus;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ServerOutputMode {
Default,
Quiet,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct ServerRunOptions {
pub output: ServerOutputMode,
}
impl Default for ServerRunOptions {
fn default() -> Self {
Self {
output: ServerOutputMode::Default,
}
}
}
impl ServerRunOptions {
pub fn quiet() -> Self {
Self {
output: ServerOutputMode::Quiet,
}
}
fn is_quiet(self) -> bool {
matches!(self.output, ServerOutputMode::Quiet)
}
}
fn spawn_span_compactor(backend: Arc<TaelBackend>, blobs: Arc<BlobStore>, blob_gc_enabled: bool) {
let window_hours: i64 = std::env::var("TAEL_HOT_TIER_HOURS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(24);
let interval_secs: u64 = std::env::var("TAEL_COMPACT_INTERVAL_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(3600);
let retention_days: i64 = std::env::var("TAEL_TRACE_RETENTION_DAYS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(365);
tokio::spawn(async move {
let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
loop {
tick.tick().await;
let backend = Arc::clone(&backend);
let blobs = Arc::clone(&blobs);
let result = tokio::task::spawn_blocking(move || {
let now = chrono::Utc::now();
let hot_cutoff = now - chrono::Duration::hours(window_hours);
let mut compacted = backend.compact_spans(hot_cutoff)?;
compacted += backend.compact_logs_metrics(hot_cutoff)?;
let dropped =
backend.enforce_span_retention(now - chrono::Duration::days(retention_days))?;
let blobs_gcd = if blob_gc_enabled {
let live = backend.collect_live_blob_hashes()?;
blobs.gc(&live)?
} else {
0
};
anyhow::Ok((compacted, dropped, blobs_gcd))
})
.await;
match result {
Ok(Ok((c, d, g))) if c > 0 || d > 0 || g > 0 => tracing::info!(
compacted = c,
partitions_dropped = d,
blobs_gcd = g,
"tael-backend maintenance"
),
Ok(Ok(_)) => {}
Ok(Err(e)) => tracing::warn!(error = %e, "maintenance failed"),
Err(e) => tracing::warn!(error = %e, "maintenance task panicked"),
}
}
});
}
pub async fn run(config: ServerConfig) -> Result<()> {
run_with_options(config, ServerRunOptions::default()).await
}
pub async fn run_embedded(config: ServerConfig) -> Result<()> {
run_with_options(config, ServerRunOptions::quiet()).await
}
pub async fn run_with_options(config: ServerConfig, options: ServerRunOptions) -> Result<()> {
if !options.is_quiet() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();
}
configure_walrus_data_dir(&config.wal_dir);
let blobs = Arc::new(match config.object_store.blobs {
StoreLocation::Fs => BlobStore::new(&config.data_dir)?,
StoreLocation::Gcs => {
let backend = open_object_backend(
StoreLocation::Gcs,
Path::new(&config.data_dir).join("blobs").as_path(),
config.object_store.blob_bucket.as_deref(),
)?;
BlobStore::with_backend(backend)?
}
});
let coordinator = match &config.cluster {
Some(cs) => {
let coord = cluster::ClusterCoordinator::start(cluster::ClusterConfig {
node_id: cs.node_id.clone(),
listen_addr: cs
.listen_addr
.parse()
.context("parsing TAEL_CLUSTER_LISTEN")?,
advertise_addr: cs
.advertise_addr
.parse()
.context("parsing TAEL_CLUSTER_ADVERTISE")?,
seeds: cs.seeds.clone(),
cluster_id: cs.cluster_id.clone(),
})
.await?;
Some(coord)
}
None => None,
};
let mut search: Option<Arc<storage::SearchIndex>> = None;
let store: Arc<dyn Store> = if !config.query_shards.is_empty() {
let shards = config
.query_shards
.iter()
.map(|url| RemoteStore::new(url).map(|s| Arc::new(s) as Arc<dyn Store>))
.collect::<Result<Vec<_>>>()?;
tracing::info!(
shards = shards.len(),
"query fan-out mode: reads scatter-gather across remote shards (no local engine)"
);
Arc::new(FanoutStore::new(shards)?)
} else {
match config.storage {
#[cfg(feature = "duckdb")]
StorageBackend::Duckdb => Arc::new(DuckDbStore::new(&config.data_dir)?),
#[cfg(not(feature = "duckdb"))]
StorageBackend::Duckdb => {
bail!(
"DuckDB storage is not included in this build; reinstall with `--features duckdb` to use --storage duckdb"
)
}
StorageBackend::TaelBackend => {
let sinks: Vec<Arc<dyn WalSink>> = config
.wal_standbys
.iter()
.map(|url| {
let sink = match &coordinator {
Some(c) => RemoteWalSink::with_epoch(url, c.leader_epoch_handle()),
None => RemoteWalSink::new(url),
};
sink.map(|s| Arc::new(s) as Arc<dyn WalSink>)
})
.collect::<Result<Vec<_>>>()?;
if !sinks.is_empty() {
tracing::info!(
standbys = sinks.len(),
required_acks = ?config.wal_required_acks,
"WAL replication enabled: shipping to standbys (leader)"
);
}
let cold_backend = match config.object_store.cold {
StoreLocation::Fs => None,
StoreLocation::Gcs => Some(open_object_backend(
StoreLocation::Gcs,
Path::new(&config.data_dir).join("cold").as_path(),
config.object_store.cold_bucket.as_deref(),
)?),
};
let comments = open_comments(&config.comments, &config.data_dir)?;
let backend = Arc::new(TaelBackend::with_components(
&config.data_dir,
"tael-backend",
sinks,
config.wal_required_acks,
cold_backend,
comments,
)?);
search = Some(backend.search_index());
let blob_gc_enabled = !config.object_store.blobs_shared()
|| config.object_store.blob_gc_coordinator;
if !blob_gc_enabled {
tracing::info!(
"blob GC disabled on this node: shared blob store, not the GC coordinator \
(set TAEL_BLOB_GC_ROLE=coordinator on exactly one node)"
);
}
spawn_span_compactor(Arc::clone(&backend), Arc::clone(&blobs), blob_gc_enabled);
backend as Arc<dyn Store>
}
}
};
let bus = Arc::new(SpanBus::new()?);
let log_bus = Arc::new(LogBus::new()?);
tracing::info!(
otlp_grpc = %config.otlp_grpc_addr,
rest_api = %config.rest_api_addr,
rest_api_socket = ?config.rest_api_socket,
data_dir = %config.data_dir,
wal_dir = %config.wal_dir,
storage = ?config.storage,
"starting tael server"
);
let grpc_handle = tokio::spawn({
let store = Arc::clone(&store);
let blobs = Arc::clone(&blobs);
let bus = Arc::clone(&bus);
let log_bus = Arc::clone(&log_bus);
let addr = config.otlp_grpc_addr.parse()?;
async move {
let trace_service = ingest::otlp::OtlpTraceService::new(
Arc::clone(&store),
Arc::clone(&blobs),
search.clone(),
bus,
);
let logs_service = ingest::otlp_logs::OtlpLogsService::new(
Arc::clone(&store),
Arc::clone(&blobs),
log_bus,
);
let metrics_service = ingest::otlp_metrics::OtlpMetricsService::new(store);
TonicServer::builder()
.add_service(
opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer::new(trace_service),
)
.add_service(
opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer::new(logs_service),
)
.add_service(
opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(metrics_service),
)
.serve_with_shutdown(addr, shutdown_signal())
.await
.expect("gRPC server failed");
}
});
let rest_handle = tokio::spawn({
let store = Arc::clone(&store);
let blobs = Arc::clone(&blobs);
let bus = Arc::clone(&bus);
let log_bus = Arc::clone(&log_bus);
let cluster = coordinator.clone();
let addr = config.rest_api_addr.clone();
let socket = config.rest_api_socket.clone();
async move {
let app = api::rest::router(store, blobs, bus, log_bus, cluster);
if let Some(socket) = socket {
#[cfg(unix)]
{
prepare_unix_socket_path(&socket)?;
let listener = tokio::net::UnixListener::bind(&socket)
.with_context(|| format!("binding REST Unix socket {socket}"))?;
tracing::info!(%socket, "REST API listening on Unix socket");
let result = axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.context("REST server failed");
cleanup_unix_socket_path(&socket);
result?;
}
#[cfg(not(unix))]
{
bail!("REST Unix sockets are only supported on Unix platforms");
}
} else {
let listener = TcpListener::bind(&addr)
.await
.with_context(|| format!("binding REST addr {addr}"))?;
tracing::info!(%addr, "REST API listening");
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.context("REST server failed")?;
}
Ok::<(), anyhow::Error>(())
}
});
if !options.is_quiet() {
print_startup_banner(&config);
}
let (grpc_res, rest_res) = tokio::join!(grpc_handle, rest_handle);
grpc_res?;
rest_res??;
if let Err(e) = store.flush() {
tracing::warn!(error = %e, "flush on shutdown failed");
}
tracing::info!("tael server stopped");
Ok(())
}
fn configure_walrus_data_dir(wal_dir: &str) {
unsafe {
std::env::set_var("WALRUS_DATA_DIR", wal_dir);
}
}
#[cfg(unix)]
fn prepare_unix_socket_path(socket: &str) -> Result<()> {
use std::os::unix::fs::FileTypeExt;
let path = std::path::Path::new(socket);
if let Some(parent) = path.parent()
&& !parent.as_os_str().is_empty()
{
std::fs::create_dir_all(parent)
.with_context(|| format!("creating REST socket directory {}", parent.display()))?;
}
match std::fs::symlink_metadata(path) {
Ok(meta) if meta.file_type().is_socket() => {
bail!(
"REST Unix socket path already exists: {}. Remove it if no server is running.",
path.display()
);
}
Ok(_) => {
bail!(
"REST Unix socket path exists and is not a socket: {}",
path.display()
);
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
Err(e) => Err(e).with_context(|| format!("checking REST socket path {}", path.display())),
}
}
#[cfg(unix)]
fn cleanup_unix_socket_path(socket: &str) {
use std::os::unix::fs::FileTypeExt;
let path = std::path::Path::new(socket);
match std::fs::symlink_metadata(path) {
Ok(meta) if meta.file_type().is_socket() => {
if let Err(e) = std::fs::remove_file(path) {
tracing::warn!(socket = %path.display(), error = %e, "failed to remove REST Unix socket");
}
}
Ok(_) | Err(_) => {}
}
}
fn print_startup_banner(config: &ServerConfig) {
let rest = rest_endpoint_label(config);
let otlp = &config.otlp_grpc_addr;
let connect_flag = cli_connect_flag(config);
println!("tael server starting");
println!(" REST API {rest}");
println!(" OTLP gRPC {otlp}");
println!(" data dir {}", config.data_dir);
println!(" WAL dir {}", config.wal_dir);
println!(" storage {:?}", config.storage);
println!();
println!("Connect a CLI from this machine:");
println!(" tael{connect_flag} services");
println!(" tael{connect_flag} live");
println!();
println!("Point a service at this server (OTLP):");
println!(" export OTEL_EXPORTER_OTLP_ENDPOINT=http://{otlp}");
println!(" export OTEL_EXPORTER_OTLP_PROTOCOL=grpc");
println!(" export OTEL_SERVICE_NAME=<your-service>");
println!();
}
fn cli_connect_flag(config: &ServerConfig) -> String {
if let Some(socket) = &config.rest_api_socket {
return format!(" --unix-socket {socket}");
}
let rest_addr = &config.rest_api_addr;
let (host, port) = match rest_addr.rsplit_once(':') {
Some((h, p)) => (h, p),
None => return String::new(),
};
let local = matches!(
host,
"127.0.0.1" | "localhost" | "0.0.0.0" | "::1" | "[::1]"
);
match (local, port) {
(true, "7701") => String::new(),
(true, p) => format!(" --port-rest {p}"),
(false, _) => format!(" --server http://{rest_addr}"),
}
}
fn rest_endpoint_label(config: &ServerConfig) -> String {
match &config.rest_api_socket {
Some(socket) => format!("unix://{socket}"),
None => format!("http://{}", config.rest_api_addr),
}
}
async fn shutdown_signal() {
let ctrl_c = async {
let _ = tokio::signal::ctrl_c().await;
};
#[cfg(unix)]
let terminate = async {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut s) => {
s.recv().await;
}
Err(e) => {
tracing::warn!(error = %e, "failed to install SIGTERM handler");
std::future::pending::<()>().await;
}
}
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {}
_ = terminate => {}
}
tracing::info!("shutdown signal received; draining listeners");
}