1pub mod query;
2pub mod receiver_grpc;
3pub mod receiver_http;
4pub mod storage;
5pub mod types;
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::{broadcast, RwLock};
11use tokio_util::sync::CancellationToken;
12use tracing::{info, warn};
13
14use crate::config::model::OtelConfig;
15use types::TelemetryEvent;
16
17use self::storage::TelemetryStore;
18
19pub struct OtelCollector {
21 store: Arc<RwLock<TelemetryStore>>,
22 events_tx: broadcast::Sender<TelemetryEvent>,
23 grpc_port: u16,
24 http_port: u16,
25}
26
27impl OtelCollector {
28 pub fn new(otel_config: &OtelConfig) -> Self {
29 let retention = humantime::parse_duration(&otel_config.retention)
30 .unwrap_or_else(|_| Duration::from_secs(3600));
31
32 let store = Arc::new(RwLock::new(TelemetryStore::new(
33 otel_config.trace_buffer,
34 otel_config.log_buffer,
35 otel_config.metric_buffer,
36 retention,
37 )));
38
39 let (events_tx, _) = broadcast::channel(1024);
40
41 Self {
42 store,
43 events_tx,
44 grpc_port: otel_config.grpc_port,
45 http_port: otel_config.http_port,
46 }
47 }
48
49 pub fn store(&self) -> Arc<RwLock<TelemetryStore>> {
50 Arc::clone(&self.store)
51 }
52
53 pub fn events_tx(&self) -> broadcast::Sender<TelemetryEvent> {
54 self.events_tx.clone()
55 }
56
57 pub async fn start(&self, cancel: CancellationToken) -> anyhow::Result<()> {
60 let grpc_store = Arc::clone(&self.store);
62 let grpc_tx = self.events_tx.clone();
63 let grpc_port = self.grpc_port;
64 let grpc_cancel = cancel.clone();
65 tokio::spawn(async move {
66 if let Err(e) =
67 receiver_grpc::start_grpc_server(grpc_port, grpc_store, grpc_tx, grpc_cancel).await
68 {
69 warn!(error = %e, "OTLP gRPC server failed");
70 }
71 });
72 info!(port = self.grpc_port, "OTLP gRPC receiver started");
73
74 let http_store = Arc::clone(&self.store);
76 let http_tx = self.events_tx.clone();
77 let http_port = self.http_port;
78 let http_cancel = cancel.clone();
79 tokio::spawn(async move {
80 if let Err(e) =
81 receiver_http::start_http_otlp_server(http_port, http_store, http_tx, http_cancel)
82 .await
83 {
84 warn!(error = %e, "OTLP HTTP server failed");
85 }
86 });
87 info!(port = self.http_port, "OTLP HTTP receiver started");
88
89 let sweep_store = Arc::clone(&self.store);
91 let sweep_cancel = cancel.clone();
92 tokio::spawn(async move {
93 loop {
94 tokio::select! {
95 _ = tokio::time::sleep(Duration::from_secs(30)) => {
96 let mut store = sweep_store.write().await;
97 store.sweep_expired();
98 }
99 _ = sweep_cancel.cancelled() => break,
100 }
101 }
102 });
103
104 Ok(())
105 }
106}