1mod api;
10mod cluster;
11mod config;
12mod ingest;
13mod log_bus;
14mod promql;
15mod span_bus;
16mod storage;
17
18use std::sync::Arc;
19
20use anyhow::{Context, Result, bail};
21use tokio::net::TcpListener;
22use tonic::transport::Server as TonicServer;
23use tracing_subscriber::EnvFilter;
24
25pub use config::{ServerConfig, StorageBackend};
26pub use storage::models::{
27 LogRecord, LogSeverity, MetricPoint, MetricType, Span, SpanEvent, SpanKind, SpanStatus,
28 TraceQuery,
29};
30#[cfg(feature = "duckdb")]
31pub use storage::DuckDbStore;
32pub use storage::{
33 BlobStore, FanoutStore, RemoteStore, RemoteWalSink, Store, TaelBackend, WalSink,
34};
35
36use log_bus::LogBus;
37use span_bus::SpanBus;
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum ServerOutputMode {
42 Default,
45 Quiet,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub struct ServerRunOptions {
55 pub output: ServerOutputMode,
56}
57
58impl Default for ServerRunOptions {
59 fn default() -> Self {
60 Self {
61 output: ServerOutputMode::Default,
62 }
63 }
64}
65
66impl ServerRunOptions {
67 pub fn quiet() -> Self {
69 Self {
70 output: ServerOutputMode::Quiet,
71 }
72 }
73
74 fn is_quiet(self) -> bool {
75 matches!(self.output, ServerOutputMode::Quiet)
76 }
77}
78
79fn spawn_span_compactor(backend: Arc<TaelBackend>, blobs: Arc<BlobStore>) {
85 let window_hours: i64 = std::env::var("TAEL_HOT_TIER_HOURS")
86 .ok()
87 .and_then(|s| s.parse().ok())
88 .unwrap_or(24);
89 let interval_secs: u64 = std::env::var("TAEL_COMPACT_INTERVAL_SECS")
90 .ok()
91 .and_then(|s| s.parse().ok())
92 .unwrap_or(3600);
93 let retention_days: i64 = std::env::var("TAEL_TRACE_RETENTION_DAYS")
95 .ok()
96 .and_then(|s| s.parse().ok())
97 .unwrap_or(365);
98 tokio::spawn(async move {
99 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
100 loop {
101 tick.tick().await;
102 let backend = Arc::clone(&backend);
103 let blobs = Arc::clone(&blobs);
104 let result = tokio::task::spawn_blocking(move || {
105 let now = chrono::Utc::now();
106 let hot_cutoff = now - chrono::Duration::hours(window_hours);
107 let mut compacted = backend.compact_spans(hot_cutoff)?;
108 compacted += backend.compact_logs_metrics(hot_cutoff)?;
109 let dropped =
110 backend.enforce_span_retention(now - chrono::Duration::days(retention_days))?;
111 let live = backend.collect_live_blob_hashes()?;
114 let blobs_gcd = blobs.gc(&live)?;
115 anyhow::Ok((compacted, dropped, blobs_gcd))
116 })
117 .await;
118 match result {
119 Ok(Ok((c, d, g))) if c > 0 || d > 0 || g > 0 => tracing::info!(
120 compacted = c,
121 partitions_dropped = d,
122 blobs_gcd = g,
123 "tael-backend maintenance"
124 ),
125 Ok(Ok(_)) => {}
126 Ok(Err(e)) => tracing::warn!(error = %e, "maintenance failed"),
127 Err(e) => tracing::warn!(error = %e, "maintenance task panicked"),
128 }
129 }
130 });
131}
132
133pub async fn run(config: ServerConfig) -> Result<()> {
139 run_with_options(config, ServerRunOptions::default()).await
140}
141
142pub async fn run_embedded(config: ServerConfig) -> Result<()> {
147 run_with_options(config, ServerRunOptions::quiet()).await
148}
149
150pub async fn run_with_options(config: ServerConfig, options: ServerRunOptions) -> Result<()> {
156 if !options.is_quiet() {
160 let _ = tracing_subscriber::fmt()
161 .with_env_filter(EnvFilter::from_default_env())
162 .try_init();
163 }
164
165 configure_walrus_data_dir(&config.wal_dir);
166
167 let blobs = Arc::new(BlobStore::new(&config.data_dir)?);
168
169 let coordinator = match &config.cluster {
172 Some(cs) => {
173 let coord = cluster::ClusterCoordinator::start(cluster::ClusterConfig {
174 node_id: cs.node_id.clone(),
175 listen_addr: cs
176 .listen_addr
177 .parse()
178 .context("parsing TAEL_CLUSTER_LISTEN")?,
179 advertise_addr: cs
180 .advertise_addr
181 .parse()
182 .context("parsing TAEL_CLUSTER_ADVERTISE")?,
183 seeds: cs.seeds.clone(),
184 cluster_id: cs.cluster_id.clone(),
185 })
186 .await?;
187 Some(coord)
188 }
189 None => None,
190 };
191
192 let mut search: Option<Arc<storage::SearchIndex>> = None;
195 let store: Arc<dyn Store> = if !config.query_shards.is_empty() {
196 let shards = config
199 .query_shards
200 .iter()
201 .map(|url| RemoteStore::new(url).map(|s| Arc::new(s) as Arc<dyn Store>))
202 .collect::<Result<Vec<_>>>()?;
203 tracing::info!(
204 shards = shards.len(),
205 "query fan-out mode: reads scatter-gather across remote shards (no local engine)"
206 );
207 Arc::new(FanoutStore::new(shards)?)
208 } else {
209 match config.storage {
210 #[cfg(feature = "duckdb")]
211 StorageBackend::Duckdb => Arc::new(DuckDbStore::new(&config.data_dir)?),
212 #[cfg(not(feature = "duckdb"))]
213 StorageBackend::Duckdb => {
214 bail!(
215 "DuckDB storage is not included in this build; reinstall with `--features duckdb` to use --storage duckdb"
216 )
217 }
218 StorageBackend::TaelBackend => {
219 let sinks: Vec<Arc<dyn WalSink>> = config
223 .wal_standbys
224 .iter()
225 .map(|url| {
226 let sink = match &coordinator {
229 Some(c) => RemoteWalSink::with_epoch(url, c.leader_epoch_handle()),
230 None => RemoteWalSink::new(url),
231 };
232 sink.map(|s| Arc::new(s) as Arc<dyn WalSink>)
233 })
234 .collect::<Result<Vec<_>>>()?;
235 let backend = Arc::new(if sinks.is_empty() {
236 TaelBackend::new(&config.data_dir)?
237 } else {
238 tracing::info!(
239 standbys = sinks.len(),
240 required_acks = ?config.wal_required_acks,
241 "WAL replication enabled: shipping to standbys (leader)"
242 );
243 TaelBackend::with_wal_key_and_sinks(
244 &config.data_dir,
245 "tael-backend",
246 sinks,
247 config.wal_required_acks,
248 )?
249 });
250 search = Some(backend.search_index());
251 spawn_span_compactor(Arc::clone(&backend), Arc::clone(&blobs));
252 backend as Arc<dyn Store>
253 }
254 }
255 };
256 let bus = Arc::new(SpanBus::new()?);
257 let log_bus = Arc::new(LogBus::new()?);
258
259 tracing::info!(
260 otlp_grpc = %config.otlp_grpc_addr,
261 rest_api = %config.rest_api_addr,
262 rest_api_socket = ?config.rest_api_socket,
263 data_dir = %config.data_dir,
264 wal_dir = %config.wal_dir,
265 storage = ?config.storage,
266 "starting tael server"
267 );
268
269 let grpc_handle = tokio::spawn({
270 let store = Arc::clone(&store);
271 let blobs = Arc::clone(&blobs);
272 let bus = Arc::clone(&bus);
273 let log_bus = Arc::clone(&log_bus);
274 let addr = config.otlp_grpc_addr.parse()?;
275 async move {
276 let trace_service = ingest::otlp::OtlpTraceService::new(
277 Arc::clone(&store),
278 Arc::clone(&blobs),
279 search.clone(),
280 bus,
281 );
282 let logs_service = ingest::otlp_logs::OtlpLogsService::new(
283 Arc::clone(&store),
284 Arc::clone(&blobs),
285 log_bus,
286 );
287 let metrics_service = ingest::otlp_metrics::OtlpMetricsService::new(store);
288 TonicServer::builder()
289 .add_service(
290 opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer::new(trace_service),
291 )
292 .add_service(
293 opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer::new(logs_service),
294 )
295 .add_service(
296 opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(metrics_service),
297 )
298 .serve_with_shutdown(addr, shutdown_signal())
299 .await
300 .expect("gRPC server failed");
301 }
302 });
303
304 let rest_handle = tokio::spawn({
305 let store = Arc::clone(&store);
306 let blobs = Arc::clone(&blobs);
307 let bus = Arc::clone(&bus);
308 let log_bus = Arc::clone(&log_bus);
309 let cluster = coordinator.clone();
310 let addr = config.rest_api_addr.clone();
311 let socket = config.rest_api_socket.clone();
312 async move {
313 let app = api::rest::router(store, blobs, bus, log_bus, cluster);
314 if let Some(socket) = socket {
315 #[cfg(unix)]
316 {
317 prepare_unix_socket_path(&socket)?;
318 let listener = tokio::net::UnixListener::bind(&socket)
319 .with_context(|| format!("binding REST Unix socket {socket}"))?;
320 tracing::info!(%socket, "REST API listening on Unix socket");
321 let result = axum::serve(listener, app)
322 .with_graceful_shutdown(shutdown_signal())
323 .await
324 .context("REST server failed");
325 cleanup_unix_socket_path(&socket);
326 result?;
327 }
328 #[cfg(not(unix))]
329 {
330 bail!("REST Unix sockets are only supported on Unix platforms");
331 }
332 } else {
333 let listener = TcpListener::bind(&addr)
334 .await
335 .with_context(|| format!("binding REST addr {addr}"))?;
336 tracing::info!(%addr, "REST API listening");
337 axum::serve(listener, app)
338 .with_graceful_shutdown(shutdown_signal())
339 .await
340 .context("REST server failed")?;
341 }
342 Ok::<(), anyhow::Error>(())
343 }
344 });
345
346 if !options.is_quiet() {
347 print_startup_banner(&config);
348 }
349
350 let (grpc_res, rest_res) = tokio::join!(grpc_handle, rest_handle);
353 grpc_res?;
354 rest_res??;
355
356 if let Err(e) = store.flush() {
359 tracing::warn!(error = %e, "flush on shutdown failed");
360 }
361 tracing::info!("tael server stopped");
362
363 Ok(())
364}
365
366fn configure_walrus_data_dir(wal_dir: &str) {
367 unsafe {
370 std::env::set_var("WALRUS_DATA_DIR", wal_dir);
371 }
372}
373
374#[cfg(unix)]
375fn prepare_unix_socket_path(socket: &str) -> Result<()> {
376 use std::os::unix::fs::FileTypeExt;
377
378 let path = std::path::Path::new(socket);
379 if let Some(parent) = path.parent()
380 && !parent.as_os_str().is_empty()
381 {
382 std::fs::create_dir_all(parent)
383 .with_context(|| format!("creating REST socket directory {}", parent.display()))?;
384 }
385
386 match std::fs::symlink_metadata(path) {
387 Ok(meta) if meta.file_type().is_socket() => {
388 bail!(
389 "REST Unix socket path already exists: {}. Remove it if no server is running.",
390 path.display()
391 );
392 }
393 Ok(_) => {
394 bail!(
395 "REST Unix socket path exists and is not a socket: {}",
396 path.display()
397 );
398 }
399 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
400 Err(e) => Err(e).with_context(|| format!("checking REST socket path {}", path.display())),
401 }
402}
403
404#[cfg(unix)]
405fn cleanup_unix_socket_path(socket: &str) {
406 use std::os::unix::fs::FileTypeExt;
407
408 let path = std::path::Path::new(socket);
409 match std::fs::symlink_metadata(path) {
410 Ok(meta) if meta.file_type().is_socket() => {
411 if let Err(e) = std::fs::remove_file(path) {
412 tracing::warn!(socket = %path.display(), error = %e, "failed to remove REST Unix socket");
413 }
414 }
415 Ok(_) | Err(_) => {}
416 }
417}
418
419fn print_startup_banner(config: &ServerConfig) {
424 let rest = rest_endpoint_label(config);
425 let otlp = &config.otlp_grpc_addr;
426 let connect_flag = cli_connect_flag(config);
427
428 println!("tael server starting");
429 println!(" REST API {rest}");
430 println!(" OTLP gRPC {otlp}");
431 println!(" data dir {}", config.data_dir);
432 println!(" WAL dir {}", config.wal_dir);
433 println!(" storage {:?}", config.storage);
434 println!();
435 println!("Connect a CLI from this machine:");
436 println!(" tael{connect_flag} services");
437 println!(" tael{connect_flag} live");
438 println!();
439 println!("Point a service at this server (OTLP):");
440 println!(" export OTEL_EXPORTER_OTLP_ENDPOINT=http://{otlp}");
441 println!(" export OTEL_EXPORTER_OTLP_PROTOCOL=grpc");
442 println!(" export OTEL_SERVICE_NAME=<your-service>");
443 println!();
444}
445
446fn cli_connect_flag(config: &ServerConfig) -> String {
450 if let Some(socket) = &config.rest_api_socket {
451 return format!(" --unix-socket {socket}");
452 }
453
454 let rest_addr = &config.rest_api_addr;
455 let (host, port) = match rest_addr.rsplit_once(':') {
456 Some((h, p)) => (h, p),
457 None => return String::new(),
458 };
459 let local = matches!(
460 host,
461 "127.0.0.1" | "localhost" | "0.0.0.0" | "::1" | "[::1]"
462 );
463 match (local, port) {
464 (true, "7701") => String::new(),
465 (true, p) => format!(" --port-rest {p}"),
466 (false, _) => format!(" --server http://{rest_addr}"),
467 }
468}
469
470fn rest_endpoint_label(config: &ServerConfig) -> String {
471 match &config.rest_api_socket {
472 Some(socket) => format!("unix://{socket}"),
473 None => format!("http://{}", config.rest_api_addr),
474 }
475}
476
477async fn shutdown_signal() {
481 let ctrl_c = async {
482 let _ = tokio::signal::ctrl_c().await;
483 };
484
485 #[cfg(unix)]
486 let terminate = async {
487 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
488 Ok(mut s) => {
489 s.recv().await;
490 }
491 Err(e) => {
492 tracing::warn!(error = %e, "failed to install SIGTERM handler");
493 std::future::pending::<()>().await;
494 }
495 }
496 };
497
498 #[cfg(not(unix))]
499 let terminate = std::future::pending::<()>();
500
501 tokio::select! {
502 _ = ctrl_c => {}
503 _ = terminate => {}
504 }
505 tracing::info!("shutdown signal received; draining listeners");
506}