1mod api;
8mod cluster;
9mod config;
10mod ingest;
11mod log_bus;
12mod promql;
13mod span_bus;
14mod storage;
15
16use std::sync::Arc;
17
18use anyhow::{Context, Result};
19use tokio::net::TcpListener;
20use tonic::transport::Server as TonicServer;
21use tracing_subscriber::EnvFilter;
22
23pub use config::{ServerConfig, StorageBackend};
24pub use storage::models::{
25 LogRecord, LogSeverity, MetricPoint, MetricType, Span, SpanEvent, SpanKind, SpanStatus,
26 TraceQuery,
27};
28pub use storage::{
29 BlobStore, DuckDbStore, FanoutStore, RemoteStore, RemoteWalSink, Store, TaelBackend, WalSink,
30};
31
32use log_bus::LogBus;
33use span_bus::SpanBus;
34
35fn spawn_span_compactor(backend: Arc<TaelBackend>, blobs: Arc<BlobStore>) {
41 let window_hours: i64 = std::env::var("TAEL_HOT_TIER_HOURS")
42 .ok()
43 .and_then(|s| s.parse().ok())
44 .unwrap_or(24);
45 let interval_secs: u64 = std::env::var("TAEL_COMPACT_INTERVAL_SECS")
46 .ok()
47 .and_then(|s| s.parse().ok())
48 .unwrap_or(3600);
49 let retention_days: i64 = std::env::var("TAEL_TRACE_RETENTION_DAYS")
51 .ok()
52 .and_then(|s| s.parse().ok())
53 .unwrap_or(365);
54 tokio::spawn(async move {
55 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
56 loop {
57 tick.tick().await;
58 let backend = Arc::clone(&backend);
59 let blobs = Arc::clone(&blobs);
60 let result = tokio::task::spawn_blocking(move || {
61 let now = chrono::Utc::now();
62 let hot_cutoff = now - chrono::Duration::hours(window_hours);
63 let mut compacted = backend.compact_spans(hot_cutoff)?;
64 compacted += backend.compact_logs_metrics(hot_cutoff)?;
65 let dropped =
66 backend.enforce_span_retention(now - chrono::Duration::days(retention_days))?;
67 let live = backend.collect_live_blob_hashes()?;
70 let blobs_gcd = blobs.gc(&live)?;
71 anyhow::Ok((compacted, dropped, blobs_gcd))
72 })
73 .await;
74 match result {
75 Ok(Ok((c, d, g))) if c > 0 || d > 0 || g > 0 => tracing::info!(
76 compacted = c,
77 partitions_dropped = d,
78 blobs_gcd = g,
79 "tael-backend maintenance"
80 ),
81 Ok(Ok(_)) => {}
82 Ok(Err(e)) => tracing::warn!(error = %e, "maintenance failed"),
83 Err(e) => tracing::warn!(error = %e, "maintenance task panicked"),
84 }
85 }
86 });
87}
88
89pub async fn run(config: ServerConfig) -> Result<()> {
93 let _ = tracing_subscriber::fmt()
96 .with_env_filter(EnvFilter::from_default_env())
97 .try_init();
98
99 configure_walrus_data_dir(&config.wal_dir);
100
101 let blobs = Arc::new(BlobStore::new(&config.data_dir)?);
102
103 let coordinator = match &config.cluster {
106 Some(cs) => {
107 let coord = cluster::ClusterCoordinator::start(cluster::ClusterConfig {
108 node_id: cs.node_id.clone(),
109 listen_addr: cs
110 .listen_addr
111 .parse()
112 .context("parsing TAEL_CLUSTER_LISTEN")?,
113 advertise_addr: cs
114 .advertise_addr
115 .parse()
116 .context("parsing TAEL_CLUSTER_ADVERTISE")?,
117 seeds: cs.seeds.clone(),
118 cluster_id: cs.cluster_id.clone(),
119 })
120 .await?;
121 Some(coord)
122 }
123 None => None,
124 };
125
126 let mut search: Option<Arc<storage::SearchIndex>> = None;
129 let store: Arc<dyn Store> = if !config.query_shards.is_empty() {
130 let shards = config
133 .query_shards
134 .iter()
135 .map(|url| RemoteStore::new(url).map(|s| Arc::new(s) as Arc<dyn Store>))
136 .collect::<Result<Vec<_>>>()?;
137 tracing::info!(
138 shards = shards.len(),
139 "query fan-out mode: reads scatter-gather across remote shards (no local engine)"
140 );
141 Arc::new(FanoutStore::new(shards)?)
142 } else {
143 match config.storage {
144 StorageBackend::Duckdb => Arc::new(DuckDbStore::new(&config.data_dir)?),
145 StorageBackend::TaelBackend => {
146 let sinks: Vec<Arc<dyn WalSink>> = config
150 .wal_standbys
151 .iter()
152 .map(|url| {
153 let sink = match &coordinator {
156 Some(c) => RemoteWalSink::with_epoch(url, c.leader_epoch_handle()),
157 None => RemoteWalSink::new(url),
158 };
159 sink.map(|s| Arc::new(s) as Arc<dyn WalSink>)
160 })
161 .collect::<Result<Vec<_>>>()?;
162 let backend = Arc::new(if sinks.is_empty() {
163 TaelBackend::new(&config.data_dir)?
164 } else {
165 tracing::info!(
166 standbys = sinks.len(),
167 required_acks = ?config.wal_required_acks,
168 "WAL replication enabled: shipping to standbys (leader)"
169 );
170 TaelBackend::with_wal_key_and_sinks(
171 &config.data_dir,
172 "tael-backend",
173 sinks,
174 config.wal_required_acks,
175 )?
176 });
177 search = Some(backend.search_index());
178 spawn_span_compactor(Arc::clone(&backend), Arc::clone(&blobs));
179 backend as Arc<dyn Store>
180 }
181 }
182 };
183 let bus = Arc::new(SpanBus::new()?);
184 let log_bus = Arc::new(LogBus::new()?);
185
186 tracing::info!(
187 otlp_grpc = %config.otlp_grpc_addr,
188 rest_api = %config.rest_api_addr,
189 data_dir = %config.data_dir,
190 wal_dir = %config.wal_dir,
191 storage = ?config.storage,
192 "starting tael server"
193 );
194
195 let grpc_handle = tokio::spawn({
196 let store = Arc::clone(&store);
197 let blobs = Arc::clone(&blobs);
198 let bus = Arc::clone(&bus);
199 let log_bus = Arc::clone(&log_bus);
200 let addr = config.otlp_grpc_addr.parse()?;
201 async move {
202 let trace_service = ingest::otlp::OtlpTraceService::new(
203 Arc::clone(&store),
204 Arc::clone(&blobs),
205 search.clone(),
206 bus,
207 );
208 let logs_service = ingest::otlp_logs::OtlpLogsService::new(
209 Arc::clone(&store),
210 Arc::clone(&blobs),
211 log_bus,
212 );
213 let metrics_service = ingest::otlp_metrics::OtlpMetricsService::new(store);
214 TonicServer::builder()
215 .add_service(
216 opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer::new(trace_service),
217 )
218 .add_service(
219 opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer::new(logs_service),
220 )
221 .add_service(
222 opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(metrics_service),
223 )
224 .serve_with_shutdown(addr, shutdown_signal())
225 .await
226 .expect("gRPC server failed");
227 }
228 });
229
230 let rest_handle = tokio::spawn({
231 let store = Arc::clone(&store);
232 let blobs = Arc::clone(&blobs);
233 let bus = Arc::clone(&bus);
234 let log_bus = Arc::clone(&log_bus);
235 let cluster = coordinator.clone();
236 let addr = config.rest_api_addr.clone();
237 async move {
238 let app = api::rest::router(store, blobs, bus, log_bus, cluster);
239 let listener = TcpListener::bind(&addr)
240 .await
241 .expect("failed to bind REST addr");
242 tracing::info!(%addr, "REST API listening");
243 axum::serve(listener, app)
244 .with_graceful_shutdown(shutdown_signal())
245 .await
246 .expect("REST server failed");
247 }
248 });
249
250 print_startup_banner(&config);
251
252 let (grpc_res, rest_res) = tokio::join!(grpc_handle, rest_handle);
255 grpc_res?;
256 rest_res?;
257
258 if let Err(e) = store.flush() {
261 tracing::warn!(error = %e, "flush on shutdown failed");
262 }
263 tracing::info!("tael server stopped");
264
265 Ok(())
266}
267
268fn configure_walrus_data_dir(wal_dir: &str) {
269 unsafe {
272 std::env::set_var("WALRUS_DATA_DIR", wal_dir);
273 }
274}
275
276fn print_startup_banner(config: &ServerConfig) {
281 let rest = &config.rest_api_addr;
282 let otlp = &config.otlp_grpc_addr;
283 let connect_flag = cli_connect_flag(rest);
284
285 println!("tael server starting");
286 println!(" REST API http://{rest}");
287 println!(" OTLP gRPC {otlp}");
288 println!(" data dir {}", config.data_dir);
289 println!(" WAL dir {}", config.wal_dir);
290 println!(" storage {:?}", config.storage);
291 println!();
292 println!("Connect a CLI from this machine:");
293 println!(" tael{connect_flag} services");
294 println!(" tael{connect_flag} live");
295 println!();
296 println!("Point a service at this server (OTLP):");
297 println!(" export OTEL_EXPORTER_OTLP_ENDPOINT=http://{otlp}");
298 println!(" export OTEL_EXPORTER_OTLP_PROTOCOL=grpc");
299 println!(" export OTEL_SERVICE_NAME=<your-service>");
300 println!();
301}
302
303fn cli_connect_flag(rest_addr: &str) -> String {
307 let (host, port) = match rest_addr.rsplit_once(':') {
308 Some((h, p)) => (h, p),
309 None => return String::new(),
310 };
311 let local = matches!(
312 host,
313 "127.0.0.1" | "localhost" | "0.0.0.0" | "::1" | "[::1]"
314 );
315 match (local, port) {
316 (true, "7701") => String::new(),
317 (true, p) => format!(" --port-rest {p}"),
318 (false, _) => format!(" --server http://{rest_addr}"),
319 }
320}
321
322async fn shutdown_signal() {
326 let ctrl_c = async {
327 let _ = tokio::signal::ctrl_c().await;
328 };
329
330 #[cfg(unix)]
331 let terminate = async {
332 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
333 Ok(mut s) => {
334 s.recv().await;
335 }
336 Err(e) => {
337 tracing::warn!(error = %e, "failed to install SIGTERM handler");
338 std::future::pending::<()>().await;
339 }
340 }
341 };
342
343 #[cfg(not(unix))]
344 let terminate = std::future::pending::<()>();
345
346 tokio::select! {
347 _ = ctrl_c => {}
348 _ = terminate => {}
349 }
350 tracing::info!("shutdown signal received; draining listeners");
351}