1mod api;
10mod cluster;
11mod config;
12mod ingest;
13mod log_bus;
14mod promql;
15mod span_bus;
16mod storage;
17
18use std::sync::Arc;
19
20use anyhow::{Context, Result, bail};
21use tokio::net::TcpListener;
22use tonic::transport::Server as TonicServer;
23use tracing_subscriber::EnvFilter;
24
25pub use config::{ServerConfig, StorageBackend};
26pub use storage::models::{
27 LogRecord, LogSeverity, MetricPoint, MetricType, Span, SpanEvent, SpanKind, SpanStatus,
28 TraceQuery,
29};
30pub use storage::{
31 BlobStore, DuckDbStore, FanoutStore, RemoteStore, RemoteWalSink, Store, TaelBackend, WalSink,
32};
33
34use log_bus::LogBus;
35use span_bus::SpanBus;
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum ServerOutputMode {
40 Default,
43 Quiet,
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub struct ServerRunOptions {
53 pub output: ServerOutputMode,
54}
55
56impl Default for ServerRunOptions {
57 fn default() -> Self {
58 Self {
59 output: ServerOutputMode::Default,
60 }
61 }
62}
63
64impl ServerRunOptions {
65 pub fn quiet() -> Self {
67 Self {
68 output: ServerOutputMode::Quiet,
69 }
70 }
71
72 fn is_quiet(self) -> bool {
73 matches!(self.output, ServerOutputMode::Quiet)
74 }
75}
76
77fn spawn_span_compactor(backend: Arc<TaelBackend>, blobs: Arc<BlobStore>) {
83 let window_hours: i64 = std::env::var("TAEL_HOT_TIER_HOURS")
84 .ok()
85 .and_then(|s| s.parse().ok())
86 .unwrap_or(24);
87 let interval_secs: u64 = std::env::var("TAEL_COMPACT_INTERVAL_SECS")
88 .ok()
89 .and_then(|s| s.parse().ok())
90 .unwrap_or(3600);
91 let retention_days: i64 = std::env::var("TAEL_TRACE_RETENTION_DAYS")
93 .ok()
94 .and_then(|s| s.parse().ok())
95 .unwrap_or(365);
96 tokio::spawn(async move {
97 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
98 loop {
99 tick.tick().await;
100 let backend = Arc::clone(&backend);
101 let blobs = Arc::clone(&blobs);
102 let result = tokio::task::spawn_blocking(move || {
103 let now = chrono::Utc::now();
104 let hot_cutoff = now - chrono::Duration::hours(window_hours);
105 let mut compacted = backend.compact_spans(hot_cutoff)?;
106 compacted += backend.compact_logs_metrics(hot_cutoff)?;
107 let dropped =
108 backend.enforce_span_retention(now - chrono::Duration::days(retention_days))?;
109 let live = backend.collect_live_blob_hashes()?;
112 let blobs_gcd = blobs.gc(&live)?;
113 anyhow::Ok((compacted, dropped, blobs_gcd))
114 })
115 .await;
116 match result {
117 Ok(Ok((c, d, g))) if c > 0 || d > 0 || g > 0 => tracing::info!(
118 compacted = c,
119 partitions_dropped = d,
120 blobs_gcd = g,
121 "tael-backend maintenance"
122 ),
123 Ok(Ok(_)) => {}
124 Ok(Err(e)) => tracing::warn!(error = %e, "maintenance failed"),
125 Err(e) => tracing::warn!(error = %e, "maintenance task panicked"),
126 }
127 }
128 });
129}
130
131pub async fn run(config: ServerConfig) -> Result<()> {
137 run_with_options(config, ServerRunOptions::default()).await
138}
139
140pub async fn run_embedded(config: ServerConfig) -> Result<()> {
145 run_with_options(config, ServerRunOptions::quiet()).await
146}
147
148pub async fn run_with_options(config: ServerConfig, options: ServerRunOptions) -> Result<()> {
154 if !options.is_quiet() {
158 let _ = tracing_subscriber::fmt()
159 .with_env_filter(EnvFilter::from_default_env())
160 .try_init();
161 }
162
163 configure_walrus_data_dir(&config.wal_dir);
164
165 let blobs = Arc::new(BlobStore::new(&config.data_dir)?);
166
167 let coordinator = match &config.cluster {
170 Some(cs) => {
171 let coord = cluster::ClusterCoordinator::start(cluster::ClusterConfig {
172 node_id: cs.node_id.clone(),
173 listen_addr: cs
174 .listen_addr
175 .parse()
176 .context("parsing TAEL_CLUSTER_LISTEN")?,
177 advertise_addr: cs
178 .advertise_addr
179 .parse()
180 .context("parsing TAEL_CLUSTER_ADVERTISE")?,
181 seeds: cs.seeds.clone(),
182 cluster_id: cs.cluster_id.clone(),
183 })
184 .await?;
185 Some(coord)
186 }
187 None => None,
188 };
189
190 let mut search: Option<Arc<storage::SearchIndex>> = None;
193 let store: Arc<dyn Store> = if !config.query_shards.is_empty() {
194 let shards = config
197 .query_shards
198 .iter()
199 .map(|url| RemoteStore::new(url).map(|s| Arc::new(s) as Arc<dyn Store>))
200 .collect::<Result<Vec<_>>>()?;
201 tracing::info!(
202 shards = shards.len(),
203 "query fan-out mode: reads scatter-gather across remote shards (no local engine)"
204 );
205 Arc::new(FanoutStore::new(shards)?)
206 } else {
207 match config.storage {
208 StorageBackend::Duckdb => Arc::new(DuckDbStore::new(&config.data_dir)?),
209 StorageBackend::TaelBackend => {
210 let sinks: Vec<Arc<dyn WalSink>> = config
214 .wal_standbys
215 .iter()
216 .map(|url| {
217 let sink = match &coordinator {
220 Some(c) => RemoteWalSink::with_epoch(url, c.leader_epoch_handle()),
221 None => RemoteWalSink::new(url),
222 };
223 sink.map(|s| Arc::new(s) as Arc<dyn WalSink>)
224 })
225 .collect::<Result<Vec<_>>>()?;
226 let backend = Arc::new(if sinks.is_empty() {
227 TaelBackend::new(&config.data_dir)?
228 } else {
229 tracing::info!(
230 standbys = sinks.len(),
231 required_acks = ?config.wal_required_acks,
232 "WAL replication enabled: shipping to standbys (leader)"
233 );
234 TaelBackend::with_wal_key_and_sinks(
235 &config.data_dir,
236 "tael-backend",
237 sinks,
238 config.wal_required_acks,
239 )?
240 });
241 search = Some(backend.search_index());
242 spawn_span_compactor(Arc::clone(&backend), Arc::clone(&blobs));
243 backend as Arc<dyn Store>
244 }
245 }
246 };
247 let bus = Arc::new(SpanBus::new()?);
248 let log_bus = Arc::new(LogBus::new()?);
249
250 tracing::info!(
251 otlp_grpc = %config.otlp_grpc_addr,
252 rest_api = %config.rest_api_addr,
253 rest_api_socket = ?config.rest_api_socket,
254 data_dir = %config.data_dir,
255 wal_dir = %config.wal_dir,
256 storage = ?config.storage,
257 "starting tael server"
258 );
259
260 let grpc_handle = tokio::spawn({
261 let store = Arc::clone(&store);
262 let blobs = Arc::clone(&blobs);
263 let bus = Arc::clone(&bus);
264 let log_bus = Arc::clone(&log_bus);
265 let addr = config.otlp_grpc_addr.parse()?;
266 async move {
267 let trace_service = ingest::otlp::OtlpTraceService::new(
268 Arc::clone(&store),
269 Arc::clone(&blobs),
270 search.clone(),
271 bus,
272 );
273 let logs_service = ingest::otlp_logs::OtlpLogsService::new(
274 Arc::clone(&store),
275 Arc::clone(&blobs),
276 log_bus,
277 );
278 let metrics_service = ingest::otlp_metrics::OtlpMetricsService::new(store);
279 TonicServer::builder()
280 .add_service(
281 opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer::new(trace_service),
282 )
283 .add_service(
284 opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer::new(logs_service),
285 )
286 .add_service(
287 opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(metrics_service),
288 )
289 .serve_with_shutdown(addr, shutdown_signal())
290 .await
291 .expect("gRPC server failed");
292 }
293 });
294
295 let rest_handle = tokio::spawn({
296 let store = Arc::clone(&store);
297 let blobs = Arc::clone(&blobs);
298 let bus = Arc::clone(&bus);
299 let log_bus = Arc::clone(&log_bus);
300 let cluster = coordinator.clone();
301 let addr = config.rest_api_addr.clone();
302 let socket = config.rest_api_socket.clone();
303 async move {
304 let app = api::rest::router(store, blobs, bus, log_bus, cluster);
305 if let Some(socket) = socket {
306 #[cfg(unix)]
307 {
308 prepare_unix_socket_path(&socket)?;
309 let listener = tokio::net::UnixListener::bind(&socket)
310 .with_context(|| format!("binding REST Unix socket {socket}"))?;
311 tracing::info!(%socket, "REST API listening on Unix socket");
312 let result = axum::serve(listener, app)
313 .with_graceful_shutdown(shutdown_signal())
314 .await
315 .context("REST server failed");
316 cleanup_unix_socket_path(&socket);
317 result?;
318 }
319 #[cfg(not(unix))]
320 {
321 bail!("REST Unix sockets are only supported on Unix platforms");
322 }
323 } else {
324 let listener = TcpListener::bind(&addr)
325 .await
326 .with_context(|| format!("binding REST addr {addr}"))?;
327 tracing::info!(%addr, "REST API listening");
328 axum::serve(listener, app)
329 .with_graceful_shutdown(shutdown_signal())
330 .await
331 .context("REST server failed")?;
332 }
333 Ok::<(), anyhow::Error>(())
334 }
335 });
336
337 if !options.is_quiet() {
338 print_startup_banner(&config);
339 }
340
341 let (grpc_res, rest_res) = tokio::join!(grpc_handle, rest_handle);
344 grpc_res?;
345 rest_res??;
346
347 if let Err(e) = store.flush() {
350 tracing::warn!(error = %e, "flush on shutdown failed");
351 }
352 tracing::info!("tael server stopped");
353
354 Ok(())
355}
356
357fn configure_walrus_data_dir(wal_dir: &str) {
358 unsafe {
361 std::env::set_var("WALRUS_DATA_DIR", wal_dir);
362 }
363}
364
365#[cfg(unix)]
366fn prepare_unix_socket_path(socket: &str) -> Result<()> {
367 use std::os::unix::fs::FileTypeExt;
368
369 let path = std::path::Path::new(socket);
370 if let Some(parent) = path.parent()
371 && !parent.as_os_str().is_empty()
372 {
373 std::fs::create_dir_all(parent)
374 .with_context(|| format!("creating REST socket directory {}", parent.display()))?;
375 }
376
377 match std::fs::symlink_metadata(path) {
378 Ok(meta) if meta.file_type().is_socket() => {
379 bail!(
380 "REST Unix socket path already exists: {}. Remove it if no server is running.",
381 path.display()
382 );
383 }
384 Ok(_) => {
385 bail!(
386 "REST Unix socket path exists and is not a socket: {}",
387 path.display()
388 );
389 }
390 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
391 Err(e) => Err(e).with_context(|| format!("checking REST socket path {}", path.display())),
392 }
393}
394
395#[cfg(unix)]
396fn cleanup_unix_socket_path(socket: &str) {
397 use std::os::unix::fs::FileTypeExt;
398
399 let path = std::path::Path::new(socket);
400 match std::fs::symlink_metadata(path) {
401 Ok(meta) if meta.file_type().is_socket() => {
402 if let Err(e) = std::fs::remove_file(path) {
403 tracing::warn!(socket = %path.display(), error = %e, "failed to remove REST Unix socket");
404 }
405 }
406 Ok(_) | Err(_) => {}
407 }
408}
409
410fn print_startup_banner(config: &ServerConfig) {
415 let rest = rest_endpoint_label(config);
416 let otlp = &config.otlp_grpc_addr;
417 let connect_flag = cli_connect_flag(config);
418
419 println!("tael server starting");
420 println!(" REST API {rest}");
421 println!(" OTLP gRPC {otlp}");
422 println!(" data dir {}", config.data_dir);
423 println!(" WAL dir {}", config.wal_dir);
424 println!(" storage {:?}", config.storage);
425 println!();
426 println!("Connect a CLI from this machine:");
427 println!(" tael{connect_flag} services");
428 println!(" tael{connect_flag} live");
429 println!();
430 println!("Point a service at this server (OTLP):");
431 println!(" export OTEL_EXPORTER_OTLP_ENDPOINT=http://{otlp}");
432 println!(" export OTEL_EXPORTER_OTLP_PROTOCOL=grpc");
433 println!(" export OTEL_SERVICE_NAME=<your-service>");
434 println!();
435}
436
437fn cli_connect_flag(config: &ServerConfig) -> String {
441 if let Some(socket) = &config.rest_api_socket {
442 return format!(" --unix-socket {socket}");
443 }
444
445 let rest_addr = &config.rest_api_addr;
446 let (host, port) = match rest_addr.rsplit_once(':') {
447 Some((h, p)) => (h, p),
448 None => return String::new(),
449 };
450 let local = matches!(
451 host,
452 "127.0.0.1" | "localhost" | "0.0.0.0" | "::1" | "[::1]"
453 );
454 match (local, port) {
455 (true, "7701") => String::new(),
456 (true, p) => format!(" --port-rest {p}"),
457 (false, _) => format!(" --server http://{rest_addr}"),
458 }
459}
460
461fn rest_endpoint_label(config: &ServerConfig) -> String {
462 match &config.rest_api_socket {
463 Some(socket) => format!("unix://{socket}"),
464 None => format!("http://{}", config.rest_api_addr),
465 }
466}
467
468async fn shutdown_signal() {
472 let ctrl_c = async {
473 let _ = tokio::signal::ctrl_c().await;
474 };
475
476 #[cfg(unix)]
477 let terminate = async {
478 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
479 Ok(mut s) => {
480 s.recv().await;
481 }
482 Err(e) => {
483 tracing::warn!(error = %e, "failed to install SIGTERM handler");
484 std::future::pending::<()>().await;
485 }
486 }
487 };
488
489 #[cfg(not(unix))]
490 let terminate = std::future::pending::<()>();
491
492 tokio::select! {
493 _ = ctrl_c => {}
494 _ = terminate => {}
495 }
496 tracing::info!("shutdown signal received; draining listeners");
497}