1mod api;
10mod cluster;
11mod config;
12mod ingest;
13mod log_bus;
14mod promql;
15mod span_bus;
16mod storage;
17
18use std::path::Path;
19use std::sync::Arc;
20
21use anyhow::{Context, Result, bail};
22use tokio::net::TcpListener;
23use tonic::transport::Server as TonicServer;
24use tracing_subscriber::EnvFilter;
25
26pub use config::{ServerConfig, StorageBackend};
27pub use storage::models::{
28 LogRecord, LogSeverity, MetricPoint, MetricType, Span, SpanEvent, SpanKind, SpanStatus,
29 TraceQuery,
30};
31#[cfg(feature = "duckdb")]
32pub use storage::DuckDbStore;
33pub use storage::{
34 BlobStore, FanoutStore, RemoteStore, RemoteWalSink, Store, TaelBackend, WalSink,
35};
36use storage::{StoreLocation, open_comments, open_object_backend};
37
38use log_bus::LogBus;
39use span_bus::SpanBus;
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum ServerOutputMode {
44 Default,
47 Quiet,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub struct ServerRunOptions {
57 pub output: ServerOutputMode,
58}
59
60impl Default for ServerRunOptions {
61 fn default() -> Self {
62 Self {
63 output: ServerOutputMode::Default,
64 }
65 }
66}
67
68impl ServerRunOptions {
69 pub fn quiet() -> Self {
71 Self {
72 output: ServerOutputMode::Quiet,
73 }
74 }
75
76 fn is_quiet(self) -> bool {
77 matches!(self.output, ServerOutputMode::Quiet)
78 }
79}
80
81fn spawn_span_compactor(backend: Arc<TaelBackend>, blobs: Arc<BlobStore>, blob_gc_enabled: bool) {
87 let window_hours: i64 = std::env::var("TAEL_HOT_TIER_HOURS")
88 .ok()
89 .and_then(|s| s.parse().ok())
90 .unwrap_or(24);
91 let interval_secs: u64 = std::env::var("TAEL_COMPACT_INTERVAL_SECS")
92 .ok()
93 .and_then(|s| s.parse().ok())
94 .unwrap_or(3600);
95 let retention_days: i64 = std::env::var("TAEL_TRACE_RETENTION_DAYS")
97 .ok()
98 .and_then(|s| s.parse().ok())
99 .unwrap_or(365);
100 tokio::spawn(async move {
101 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
102 loop {
103 tick.tick().await;
104 let backend = Arc::clone(&backend);
105 let blobs = Arc::clone(&blobs);
106 let result = tokio::task::spawn_blocking(move || {
107 let now = chrono::Utc::now();
108 let hot_cutoff = now - chrono::Duration::hours(window_hours);
109 let mut compacted = backend.compact_spans(hot_cutoff)?;
110 compacted += backend.compact_logs_metrics(hot_cutoff)?;
111 let dropped =
112 backend.enforce_span_retention(now - chrono::Duration::days(retention_days))?;
113 let blobs_gcd = if blob_gc_enabled {
119 let live = backend.collect_live_blob_hashes()?;
120 blobs.gc(&live)?
121 } else {
122 0
123 };
124 anyhow::Ok((compacted, dropped, blobs_gcd))
125 })
126 .await;
127 match result {
128 Ok(Ok((c, d, g))) if c > 0 || d > 0 || g > 0 => tracing::info!(
129 compacted = c,
130 partitions_dropped = d,
131 blobs_gcd = g,
132 "tael-backend maintenance"
133 ),
134 Ok(Ok(_)) => {}
135 Ok(Err(e)) => tracing::warn!(error = %e, "maintenance failed"),
136 Err(e) => tracing::warn!(error = %e, "maintenance task panicked"),
137 }
138 }
139 });
140}
141
142pub async fn run(config: ServerConfig) -> Result<()> {
148 run_with_options(config, ServerRunOptions::default()).await
149}
150
151pub async fn run_embedded(config: ServerConfig) -> Result<()> {
156 run_with_options(config, ServerRunOptions::quiet()).await
157}
158
159pub async fn run_with_options(config: ServerConfig, options: ServerRunOptions) -> Result<()> {
165 if !options.is_quiet() {
169 let _ = tracing_subscriber::fmt()
170 .with_env_filter(EnvFilter::from_default_env())
171 .try_init();
172 }
173
174 configure_walrus_data_dir(&config.wal_dir);
175
176 let blobs = Arc::new(match config.object_store.blobs {
179 StoreLocation::Fs => BlobStore::new(&config.data_dir)?,
180 StoreLocation::Gcs => {
181 let backend = open_object_backend(
182 StoreLocation::Gcs,
183 Path::new(&config.data_dir).join("blobs").as_path(),
184 config.object_store.blob_bucket.as_deref(),
185 )?;
186 BlobStore::with_backend(backend)?
187 }
188 });
189
190 let coordinator = match &config.cluster {
193 Some(cs) => {
194 let coord = cluster::ClusterCoordinator::start(cluster::ClusterConfig {
195 node_id: cs.node_id.clone(),
196 listen_addr: cs
197 .listen_addr
198 .parse()
199 .context("parsing TAEL_CLUSTER_LISTEN")?,
200 advertise_addr: cs
201 .advertise_addr
202 .parse()
203 .context("parsing TAEL_CLUSTER_ADVERTISE")?,
204 seeds: cs.seeds.clone(),
205 cluster_id: cs.cluster_id.clone(),
206 })
207 .await?;
208 Some(coord)
209 }
210 None => None,
211 };
212
213 let mut search: Option<Arc<storage::SearchIndex>> = None;
216 let store: Arc<dyn Store> = if !config.query_shards.is_empty() {
217 let shards = config
220 .query_shards
221 .iter()
222 .map(|url| RemoteStore::new(url).map(|s| Arc::new(s) as Arc<dyn Store>))
223 .collect::<Result<Vec<_>>>()?;
224 tracing::info!(
225 shards = shards.len(),
226 "query fan-out mode: reads scatter-gather across remote shards (no local engine)"
227 );
228 Arc::new(FanoutStore::new(shards)?)
229 } else {
230 match config.storage {
231 #[cfg(feature = "duckdb")]
232 StorageBackend::Duckdb => Arc::new(DuckDbStore::new(&config.data_dir)?),
233 #[cfg(not(feature = "duckdb"))]
234 StorageBackend::Duckdb => {
235 bail!(
236 "DuckDB storage is not included in this build; reinstall with `--features duckdb` to use --storage duckdb"
237 )
238 }
239 StorageBackend::TaelBackend => {
240 let sinks: Vec<Arc<dyn WalSink>> = config
244 .wal_standbys
245 .iter()
246 .map(|url| {
247 let sink = match &coordinator {
250 Some(c) => RemoteWalSink::with_epoch(url, c.leader_epoch_handle()),
251 None => RemoteWalSink::new(url),
252 };
253 sink.map(|s| Arc::new(s) as Arc<dyn WalSink>)
254 })
255 .collect::<Result<Vec<_>>>()?;
256 if !sinks.is_empty() {
257 tracing::info!(
258 standbys = sinks.len(),
259 required_acks = ?config.wal_required_acks,
260 "WAL replication enabled: shipping to standbys (leader)"
261 );
262 }
263 let cold_backend = match config.object_store.cold {
266 StoreLocation::Fs => None,
267 StoreLocation::Gcs => Some(open_object_backend(
268 StoreLocation::Gcs,
269 Path::new(&config.data_dir).join("cold").as_path(),
270 config.object_store.cold_bucket.as_deref(),
271 )?),
272 };
273 let comments = open_comments(&config.comments, &config.data_dir)?;
275 let backend = Arc::new(TaelBackend::with_components(
276 &config.data_dir,
277 "tael-backend",
278 sinks,
279 config.wal_required_acks,
280 cold_backend,
281 comments,
282 )?);
283 search = Some(backend.search_index());
284 let blob_gc_enabled = !config.object_store.blobs_shared()
289 || config.object_store.blob_gc_coordinator;
290 if !blob_gc_enabled {
291 tracing::info!(
292 "blob GC disabled on this node: shared blob store, not the GC coordinator \
293 (set TAEL_BLOB_GC_ROLE=coordinator on exactly one node)"
294 );
295 }
296 spawn_span_compactor(Arc::clone(&backend), Arc::clone(&blobs), blob_gc_enabled);
297 backend as Arc<dyn Store>
298 }
299 }
300 };
301 let bus = Arc::new(SpanBus::new()?);
302 let log_bus = Arc::new(LogBus::new()?);
303
304 tracing::info!(
305 otlp_grpc = %config.otlp_grpc_addr,
306 rest_api = %config.rest_api_addr,
307 rest_api_socket = ?config.rest_api_socket,
308 data_dir = %config.data_dir,
309 wal_dir = %config.wal_dir,
310 storage = ?config.storage,
311 "starting tael server"
312 );
313
314 let grpc_handle = tokio::spawn({
315 let store = Arc::clone(&store);
316 let blobs = Arc::clone(&blobs);
317 let bus = Arc::clone(&bus);
318 let log_bus = Arc::clone(&log_bus);
319 let addr = config.otlp_grpc_addr.parse()?;
320 async move {
321 let trace_service = ingest::otlp::OtlpTraceService::new(
322 Arc::clone(&store),
323 Arc::clone(&blobs),
324 search.clone(),
325 bus,
326 );
327 let logs_service = ingest::otlp_logs::OtlpLogsService::new(
328 Arc::clone(&store),
329 Arc::clone(&blobs),
330 log_bus,
331 );
332 let metrics_service = ingest::otlp_metrics::OtlpMetricsService::new(store);
333 TonicServer::builder()
334 .add_service(
335 opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer::new(trace_service),
336 )
337 .add_service(
338 opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer::new(logs_service),
339 )
340 .add_service(
341 opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(metrics_service),
342 )
343 .serve_with_shutdown(addr, shutdown_signal())
344 .await
345 .expect("gRPC server failed");
346 }
347 });
348
349 let rest_handle = tokio::spawn({
350 let store = Arc::clone(&store);
351 let blobs = Arc::clone(&blobs);
352 let bus = Arc::clone(&bus);
353 let log_bus = Arc::clone(&log_bus);
354 let cluster = coordinator.clone();
355 let addr = config.rest_api_addr.clone();
356 let socket = config.rest_api_socket.clone();
357 async move {
358 let app = api::rest::router(store, blobs, bus, log_bus, cluster);
359 if let Some(socket) = socket {
360 #[cfg(unix)]
361 {
362 prepare_unix_socket_path(&socket)?;
363 let listener = tokio::net::UnixListener::bind(&socket)
364 .with_context(|| format!("binding REST Unix socket {socket}"))?;
365 tracing::info!(%socket, "REST API listening on Unix socket");
366 let result = axum::serve(listener, app)
367 .with_graceful_shutdown(shutdown_signal())
368 .await
369 .context("REST server failed");
370 cleanup_unix_socket_path(&socket);
371 result?;
372 }
373 #[cfg(not(unix))]
374 {
375 bail!("REST Unix sockets are only supported on Unix platforms");
376 }
377 } else {
378 let listener = TcpListener::bind(&addr)
379 .await
380 .with_context(|| format!("binding REST addr {addr}"))?;
381 tracing::info!(%addr, "REST API listening");
382 axum::serve(listener, app)
383 .with_graceful_shutdown(shutdown_signal())
384 .await
385 .context("REST server failed")?;
386 }
387 Ok::<(), anyhow::Error>(())
388 }
389 });
390
391 if !options.is_quiet() {
392 print_startup_banner(&config);
393 }
394
395 let (grpc_res, rest_res) = tokio::join!(grpc_handle, rest_handle);
398 grpc_res?;
399 rest_res??;
400
401 if let Err(e) = store.flush() {
404 tracing::warn!(error = %e, "flush on shutdown failed");
405 }
406 tracing::info!("tael server stopped");
407
408 Ok(())
409}
410
411fn configure_walrus_data_dir(wal_dir: &str) {
412 unsafe {
415 std::env::set_var("WALRUS_DATA_DIR", wal_dir);
416 }
417}
418
419#[cfg(unix)]
420fn prepare_unix_socket_path(socket: &str) -> Result<()> {
421 use std::os::unix::fs::FileTypeExt;
422
423 let path = std::path::Path::new(socket);
424 if let Some(parent) = path.parent()
425 && !parent.as_os_str().is_empty()
426 {
427 std::fs::create_dir_all(parent)
428 .with_context(|| format!("creating REST socket directory {}", parent.display()))?;
429 }
430
431 match std::fs::symlink_metadata(path) {
432 Ok(meta) if meta.file_type().is_socket() => {
433 bail!(
434 "REST Unix socket path already exists: {}. Remove it if no server is running.",
435 path.display()
436 );
437 }
438 Ok(_) => {
439 bail!(
440 "REST Unix socket path exists and is not a socket: {}",
441 path.display()
442 );
443 }
444 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
445 Err(e) => Err(e).with_context(|| format!("checking REST socket path {}", path.display())),
446 }
447}
448
449#[cfg(unix)]
450fn cleanup_unix_socket_path(socket: &str) {
451 use std::os::unix::fs::FileTypeExt;
452
453 let path = std::path::Path::new(socket);
454 match std::fs::symlink_metadata(path) {
455 Ok(meta) if meta.file_type().is_socket() => {
456 if let Err(e) = std::fs::remove_file(path) {
457 tracing::warn!(socket = %path.display(), error = %e, "failed to remove REST Unix socket");
458 }
459 }
460 Ok(_) | Err(_) => {}
461 }
462}
463
464fn print_startup_banner(config: &ServerConfig) {
469 let rest = rest_endpoint_label(config);
470 let otlp = &config.otlp_grpc_addr;
471 let connect_flag = cli_connect_flag(config);
472
473 println!("tael server starting");
474 println!(" REST API {rest}");
475 println!(" OTLP gRPC {otlp}");
476 println!(" data dir {}", config.data_dir);
477 println!(" WAL dir {}", config.wal_dir);
478 println!(" storage {:?}", config.storage);
479 println!();
480 println!("Connect a CLI from this machine:");
481 println!(" tael{connect_flag} services");
482 println!(" tael{connect_flag} live");
483 println!();
484 println!("Point a service at this server (OTLP):");
485 println!(" export OTEL_EXPORTER_OTLP_ENDPOINT=http://{otlp}");
486 println!(" export OTEL_EXPORTER_OTLP_PROTOCOL=grpc");
487 println!(" export OTEL_SERVICE_NAME=<your-service>");
488 println!();
489}
490
491fn cli_connect_flag(config: &ServerConfig) -> String {
495 if let Some(socket) = &config.rest_api_socket {
496 return format!(" --unix-socket {socket}");
497 }
498
499 let rest_addr = &config.rest_api_addr;
500 let (host, port) = match rest_addr.rsplit_once(':') {
501 Some((h, p)) => (h, p),
502 None => return String::new(),
503 };
504 let local = matches!(
505 host,
506 "127.0.0.1" | "localhost" | "0.0.0.0" | "::1" | "[::1]"
507 );
508 match (local, port) {
509 (true, "7701") => String::new(),
510 (true, p) => format!(" --port-rest {p}"),
511 (false, _) => format!(" --server http://{rest_addr}"),
512 }
513}
514
515fn rest_endpoint_label(config: &ServerConfig) -> String {
516 match &config.rest_api_socket {
517 Some(socket) => format!("unix://{socket}"),
518 None => format!("http://{}", config.rest_api_addr),
519 }
520}
521
522async fn shutdown_signal() {
526 let ctrl_c = async {
527 let _ = tokio::signal::ctrl_c().await;
528 };
529
530 #[cfg(unix)]
531 let terminate = async {
532 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
533 Ok(mut s) => {
534 s.recv().await;
535 }
536 Err(e) => {
537 tracing::warn!(error = %e, "failed to install SIGTERM handler");
538 std::future::pending::<()>().await;
539 }
540 }
541 };
542
543 #[cfg(not(unix))]
544 let terminate = std::future::pending::<()>();
545
546 tokio::select! {
547 _ = ctrl_c => {}
548 _ = terminate => {}
549 }
550 tracing::info!("shutdown signal received; draining listeners");
551}