1mod api;
8mod cluster;
9mod config;
10mod ingest;
11mod log_bus;
12mod promql;
13mod span_bus;
14mod storage;
15
16use std::sync::Arc;
17
18use anyhow::{Context, Result};
19use tokio::net::TcpListener;
20use tonic::transport::Server as TonicServer;
21use tracing_subscriber::EnvFilter;
22
23pub use config::{ServerConfig, StorageBackend};
24
25use log_bus::LogBus;
26use span_bus::SpanBus;
27use storage::{
28 BlobStore, DuckDbStore, FanoutStore, RemoteStore, RemoteWalSink, Store, TaelBackend, WalSink,
29};
30
31fn spawn_span_compactor(backend: Arc<TaelBackend>, blobs: Arc<BlobStore>) {
37 let window_hours: i64 = std::env::var("TAEL_HOT_TIER_HOURS")
38 .ok()
39 .and_then(|s| s.parse().ok())
40 .unwrap_or(24);
41 let interval_secs: u64 = std::env::var("TAEL_COMPACT_INTERVAL_SECS")
42 .ok()
43 .and_then(|s| s.parse().ok())
44 .unwrap_or(3600);
45 let retention_days: i64 = std::env::var("TAEL_TRACE_RETENTION_DAYS")
47 .ok()
48 .and_then(|s| s.parse().ok())
49 .unwrap_or(365);
50 tokio::spawn(async move {
51 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
52 loop {
53 tick.tick().await;
54 let backend = Arc::clone(&backend);
55 let blobs = Arc::clone(&blobs);
56 let result = tokio::task::spawn_blocking(move || {
57 let now = chrono::Utc::now();
58 let hot_cutoff = now - chrono::Duration::hours(window_hours);
59 let mut compacted = backend.compact_spans(hot_cutoff)?;
60 compacted += backend.compact_logs_metrics(hot_cutoff)?;
61 let dropped =
62 backend.enforce_span_retention(now - chrono::Duration::days(retention_days))?;
63 let live = backend.collect_live_blob_hashes()?;
66 let blobs_gcd = blobs.gc(&live)?;
67 anyhow::Ok((compacted, dropped, blobs_gcd))
68 })
69 .await;
70 match result {
71 Ok(Ok((c, d, g))) if c > 0 || d > 0 || g > 0 => tracing::info!(
72 compacted = c,
73 partitions_dropped = d,
74 blobs_gcd = g,
75 "tael-backend maintenance"
76 ),
77 Ok(Ok(_)) => {}
78 Ok(Err(e)) => tracing::warn!(error = %e, "maintenance failed"),
79 Err(e) => tracing::warn!(error = %e, "maintenance task panicked"),
80 }
81 }
82 });
83}
84
85pub async fn run(config: ServerConfig) -> Result<()> {
89 let _ = tracing_subscriber::fmt()
92 .with_env_filter(EnvFilter::from_default_env())
93 .try_init();
94
95 let blobs = Arc::new(BlobStore::new(&config.data_dir)?);
96
97 let coordinator = match &config.cluster {
100 Some(cs) => {
101 let coord = cluster::ClusterCoordinator::start(cluster::ClusterConfig {
102 node_id: cs.node_id.clone(),
103 listen_addr: cs.listen_addr.parse().context("parsing TAEL_CLUSTER_LISTEN")?,
104 advertise_addr: cs
105 .advertise_addr
106 .parse()
107 .context("parsing TAEL_CLUSTER_ADVERTISE")?,
108 seeds: cs.seeds.clone(),
109 cluster_id: cs.cluster_id.clone(),
110 })
111 .await?;
112 Some(coord)
113 }
114 None => None,
115 };
116
117 let mut search: Option<Arc<storage::SearchIndex>> = None;
120 let store: Arc<dyn Store> = if !config.query_shards.is_empty() {
121 let shards = config
124 .query_shards
125 .iter()
126 .map(|url| RemoteStore::new(url).map(|s| Arc::new(s) as Arc<dyn Store>))
127 .collect::<Result<Vec<_>>>()?;
128 tracing::info!(
129 shards = shards.len(),
130 "query fan-out mode: reads scatter-gather across remote shards (no local engine)"
131 );
132 Arc::new(FanoutStore::new(shards)?)
133 } else {
134 match config.storage {
135 StorageBackend::Duckdb => Arc::new(DuckDbStore::new(&config.data_dir)?),
136 StorageBackend::TaelBackend => {
137 let sinks: Vec<Arc<dyn WalSink>> = config
141 .wal_standbys
142 .iter()
143 .map(|url| {
144 let sink = match &coordinator {
147 Some(c) => RemoteWalSink::with_epoch(url, c.leader_epoch_handle()),
148 None => RemoteWalSink::new(url),
149 };
150 sink.map(|s| Arc::new(s) as Arc<dyn WalSink>)
151 })
152 .collect::<Result<Vec<_>>>()?;
153 let backend = Arc::new(if sinks.is_empty() {
154 TaelBackend::new(&config.data_dir)?
155 } else {
156 tracing::info!(
157 standbys = sinks.len(),
158 required_acks = ?config.wal_required_acks,
159 "WAL replication enabled: shipping to standbys (leader)"
160 );
161 TaelBackend::with_wal_key_and_sinks(
162 &config.data_dir,
163 "tael-backend",
164 sinks,
165 config.wal_required_acks,
166 )?
167 });
168 search = Some(backend.search_index());
169 spawn_span_compactor(Arc::clone(&backend), Arc::clone(&blobs));
170 backend as Arc<dyn Store>
171 }
172 }
173 };
174 let bus = Arc::new(SpanBus::new()?);
175 let log_bus = Arc::new(LogBus::new()?);
176
177 tracing::info!(
178 otlp_grpc = %config.otlp_grpc_addr,
179 rest_api = %config.rest_api_addr,
180 data_dir = %config.data_dir,
181 storage = ?config.storage,
182 "starting tael server"
183 );
184
185 let grpc_handle = tokio::spawn({
186 let store = Arc::clone(&store);
187 let blobs = Arc::clone(&blobs);
188 let bus = Arc::clone(&bus);
189 let log_bus = Arc::clone(&log_bus);
190 let addr = config.otlp_grpc_addr.parse()?;
191 async move {
192 let trace_service = ingest::otlp::OtlpTraceService::new(
193 Arc::clone(&store),
194 Arc::clone(&blobs),
195 search.clone(),
196 bus,
197 );
198 let logs_service = ingest::otlp_logs::OtlpLogsService::new(
199 Arc::clone(&store),
200 Arc::clone(&blobs),
201 log_bus,
202 );
203 let metrics_service =
204 ingest::otlp_metrics::OtlpMetricsService::new(store);
205 TonicServer::builder()
206 .add_service(
207 opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer::new(trace_service),
208 )
209 .add_service(
210 opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer::new(logs_service),
211 )
212 .add_service(
213 opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(metrics_service),
214 )
215 .serve_with_shutdown(addr, shutdown_signal())
216 .await
217 .expect("gRPC server failed");
218 }
219 });
220
221 let rest_handle = tokio::spawn({
222 let store = Arc::clone(&store);
223 let blobs = Arc::clone(&blobs);
224 let bus = Arc::clone(&bus);
225 let log_bus = Arc::clone(&log_bus);
226 let cluster = coordinator.clone();
227 let addr = config.rest_api_addr.clone();
228 async move {
229 let app = api::rest::router(store, blobs, bus, log_bus, cluster);
230 let listener = TcpListener::bind(&addr).await.expect("failed to bind REST addr");
231 tracing::info!(%addr, "REST API listening");
232 axum::serve(listener, app)
233 .with_graceful_shutdown(shutdown_signal())
234 .await
235 .expect("REST server failed");
236 }
237 });
238
239 print_startup_banner(&config);
240
241 let (grpc_res, rest_res) = tokio::join!(grpc_handle, rest_handle);
244 grpc_res?;
245 rest_res?;
246
247 if let Err(e) = store.flush() {
250 tracing::warn!(error = %e, "flush on shutdown failed");
251 }
252 tracing::info!("tael server stopped");
253
254 Ok(())
255}
256
257fn print_startup_banner(config: &ServerConfig) {
262 let rest = &config.rest_api_addr;
263 let otlp = &config.otlp_grpc_addr;
264 let connect_flag = cli_connect_flag(rest);
265
266 println!("tael server starting");
267 println!(" REST API http://{rest}");
268 println!(" OTLP gRPC {otlp}");
269 println!(" data dir {}", config.data_dir);
270 println!(" storage {:?}", config.storage);
271 println!();
272 println!("Connect a CLI from this machine:");
273 println!(" tael{connect_flag} services");
274 println!(" tael{connect_flag} live");
275 println!();
276 println!("Point a service at this server (OTLP):");
277 println!(" export OTEL_EXPORTER_OTLP_ENDPOINT=http://{otlp}");
278 println!(" export OTEL_EXPORTER_OTLP_PROTOCOL=grpc");
279 println!(" export OTEL_SERVICE_NAME=<your-service>");
280 println!();
281}
282
283fn cli_connect_flag(rest_addr: &str) -> String {
287 let (host, port) = match rest_addr.rsplit_once(':') {
288 Some((h, p)) => (h, p),
289 None => return String::new(),
290 };
291 let local = matches!(host, "127.0.0.1" | "localhost" | "0.0.0.0" | "::1" | "[::1]");
292 match (local, port) {
293 (true, "7701") => String::new(),
294 (true, p) => format!(" --port {p}"),
295 (false, _) => format!(" --server http://{rest_addr}"),
296 }
297}
298
299async fn shutdown_signal() {
303 let ctrl_c = async {
304 let _ = tokio::signal::ctrl_c().await;
305 };
306
307 #[cfg(unix)]
308 let terminate = async {
309 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
310 Ok(mut s) => {
311 s.recv().await;
312 }
313 Err(e) => {
314 tracing::warn!(error = %e, "failed to install SIGTERM handler");
315 std::future::pending::<()>().await;
316 }
317 }
318 };
319
320 #[cfg(not(unix))]
321 let terminate = std::future::pending::<()>();
322
323 tokio::select! {
324 _ = ctrl_c => {}
325 _ = terminate => {}
326 }
327 tracing::info!("shutdown signal received; draining listeners");
328}