1mod api;
8mod config;
9mod ingest;
10mod log_bus;
11mod promql;
12mod span_bus;
13mod storage;
14
15use std::sync::Arc;
16
17use anyhow::Result;
18use tokio::net::TcpListener;
19use tonic::transport::Server as TonicServer;
20use tracing_subscriber::EnvFilter;
21
22pub use config::{ServerConfig, StorageBackend};
23
24use log_bus::LogBus;
25use span_bus::SpanBus;
26use storage::{BlobStore, DuckDbStore, Store, TaelBackend};
27
28fn spawn_span_compactor(backend: Arc<TaelBackend>, blobs: Arc<BlobStore>) {
34 let window_hours: i64 = std::env::var("TAEL_HOT_TIER_HOURS")
35 .ok()
36 .and_then(|s| s.parse().ok())
37 .unwrap_or(24);
38 let interval_secs: u64 = std::env::var("TAEL_COMPACT_INTERVAL_SECS")
39 .ok()
40 .and_then(|s| s.parse().ok())
41 .unwrap_or(3600);
42 let retention_days: i64 = std::env::var("TAEL_TRACE_RETENTION_DAYS")
44 .ok()
45 .and_then(|s| s.parse().ok())
46 .unwrap_or(365);
47 tokio::spawn(async move {
48 let mut tick = tokio::time::interval(std::time::Duration::from_secs(interval_secs));
49 loop {
50 tick.tick().await;
51 let backend = Arc::clone(&backend);
52 let blobs = Arc::clone(&blobs);
53 let result = tokio::task::spawn_blocking(move || {
54 let now = chrono::Utc::now();
55 let hot_cutoff = now - chrono::Duration::hours(window_hours);
56 let mut compacted = backend.compact_spans(hot_cutoff)?;
57 compacted += backend.compact_logs_metrics(hot_cutoff)?;
58 let dropped =
59 backend.enforce_span_retention(now - chrono::Duration::days(retention_days))?;
60 let live = backend.collect_live_blob_hashes()?;
63 let blobs_gcd = blobs.gc(&live)?;
64 anyhow::Ok((compacted, dropped, blobs_gcd))
65 })
66 .await;
67 match result {
68 Ok(Ok((c, d, g))) if c > 0 || d > 0 || g > 0 => tracing::info!(
69 compacted = c,
70 partitions_dropped = d,
71 blobs_gcd = g,
72 "tael-backend maintenance"
73 ),
74 Ok(Ok(_)) => {}
75 Ok(Err(e)) => tracing::warn!(error = %e, "maintenance failed"),
76 Err(e) => tracing::warn!(error = %e, "maintenance task panicked"),
77 }
78 }
79 });
80}
81
82pub async fn run(config: ServerConfig) -> Result<()> {
86 let _ = tracing_subscriber::fmt()
89 .with_env_filter(EnvFilter::from_default_env())
90 .try_init();
91
92 let blobs = Arc::new(BlobStore::new(&config.data_dir)?);
93 let mut search: Option<Arc<storage::SearchIndex>> = None;
96 let store: Arc<dyn Store> = match config.storage {
97 StorageBackend::Duckdb => Arc::new(DuckDbStore::new(&config.data_dir)?),
98 StorageBackend::TaelBackend => {
99 let backend = Arc::new(TaelBackend::new(&config.data_dir)?);
100 search = Some(backend.search_index());
101 spawn_span_compactor(Arc::clone(&backend), Arc::clone(&blobs));
102 backend as Arc<dyn Store>
103 }
104 };
105 let bus = Arc::new(SpanBus::new()?);
106 let log_bus = Arc::new(LogBus::new()?);
107
108 tracing::info!(
109 otlp_grpc = %config.otlp_grpc_addr,
110 rest_api = %config.rest_api_addr,
111 data_dir = %config.data_dir,
112 storage = ?config.storage,
113 "starting tael server"
114 );
115
116 let grpc_handle = tokio::spawn({
117 let store = Arc::clone(&store);
118 let blobs = Arc::clone(&blobs);
119 let bus = Arc::clone(&bus);
120 let log_bus = Arc::clone(&log_bus);
121 let addr = config.otlp_grpc_addr.parse()?;
122 async move {
123 let trace_service = ingest::otlp::OtlpTraceService::new(
124 Arc::clone(&store),
125 Arc::clone(&blobs),
126 search.clone(),
127 bus,
128 );
129 let logs_service = ingest::otlp_logs::OtlpLogsService::new(
130 Arc::clone(&store),
131 Arc::clone(&blobs),
132 log_bus,
133 );
134 let metrics_service =
135 ingest::otlp_metrics::OtlpMetricsService::new(store);
136 TonicServer::builder()
137 .add_service(
138 opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer::new(trace_service),
139 )
140 .add_service(
141 opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer::new(logs_service),
142 )
143 .add_service(
144 opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer::new(metrics_service),
145 )
146 .serve(addr)
147 .await
148 .expect("gRPC server failed");
149 }
150 });
151
152 let rest_handle = tokio::spawn({
153 let store = Arc::clone(&store);
154 let blobs = Arc::clone(&blobs);
155 let bus = Arc::clone(&bus);
156 let log_bus = Arc::clone(&log_bus);
157 let addr = config.rest_api_addr.clone();
158 async move {
159 let app = api::rest::router(store, blobs, bus, log_bus);
160 let listener = TcpListener::bind(&addr).await.expect("failed to bind REST addr");
161 tracing::info!(%addr, "REST API listening");
162 axum::serve(listener, app).await.expect("REST server failed");
163 }
164 });
165
166 tokio::select! {
167 r = grpc_handle => r?,
168 r = rest_handle => r?,
169 }
170
171 Ok(())
172}