Skip to main content

devrig/otel/
mod.rs

1pub mod query;
2pub mod receiver_grpc;
3pub mod receiver_http;
4pub mod storage;
5pub mod types;
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::{broadcast, RwLock};
11use tokio_util::sync::CancellationToken;
12use tracing::{info, warn};
13
14use crate::config::model::OtelConfig;
15use types::TelemetryEvent;
16
17use self::storage::TelemetryStore;
18
19/// Coordinator for OTLP gRPC + HTTP receivers and telemetry storage.
20pub struct OtelCollector {
21    store: Arc<RwLock<TelemetryStore>>,
22    events_tx: broadcast::Sender<TelemetryEvent>,
23    grpc_port: u16,
24    http_port: u16,
25}
26
27impl OtelCollector {
28    pub fn new(otel_config: &OtelConfig) -> Self {
29        let retention = humantime::parse_duration(&otel_config.retention)
30            .unwrap_or_else(|_| Duration::from_secs(3600));
31
32        let store = Arc::new(RwLock::new(TelemetryStore::new(
33            otel_config.trace_buffer,
34            otel_config.log_buffer,
35            otel_config.metric_buffer,
36            retention,
37        )));
38
39        let (events_tx, _) = broadcast::channel(1024);
40
41        Self {
42            store,
43            events_tx,
44            grpc_port: otel_config.grpc_port,
45            http_port: otel_config.http_port,
46        }
47    }
48
49    pub fn store(&self) -> Arc<RwLock<TelemetryStore>> {
50        Arc::clone(&self.store)
51    }
52
53    pub fn events_tx(&self) -> broadcast::Sender<TelemetryEvent> {
54        self.events_tx.clone()
55    }
56
57    /// Start the OTLP gRPC and HTTP receivers as background tasks.
58    /// Also starts a background sweeper for expired telemetry.
59    pub async fn start(&self, cancel: CancellationToken) -> anyhow::Result<()> {
60        // Start gRPC OTLP receiver
61        let grpc_store = Arc::clone(&self.store);
62        let grpc_tx = self.events_tx.clone();
63        let grpc_port = self.grpc_port;
64        let grpc_cancel = cancel.clone();
65        tokio::spawn(async move {
66            if let Err(e) =
67                receiver_grpc::start_grpc_server(grpc_port, grpc_store, grpc_tx, grpc_cancel).await
68            {
69                warn!(error = %e, "OTLP gRPC server failed");
70            }
71        });
72        info!(port = self.grpc_port, "OTLP gRPC receiver started");
73
74        // Start HTTP OTLP receiver
75        let http_store = Arc::clone(&self.store);
76        let http_tx = self.events_tx.clone();
77        let http_port = self.http_port;
78        let http_cancel = cancel.clone();
79        tokio::spawn(async move {
80            if let Err(e) =
81                receiver_http::start_http_otlp_server(http_port, http_store, http_tx, http_cancel)
82                    .await
83            {
84                warn!(error = %e, "OTLP HTTP server failed");
85            }
86        });
87        info!(port = self.http_port, "OTLP HTTP receiver started");
88
89        // Background sweeper for expired telemetry
90        let sweep_store = Arc::clone(&self.store);
91        let sweep_cancel = cancel.clone();
92        tokio::spawn(async move {
93            loop {
94                tokio::select! {
95                    _ = tokio::time::sleep(Duration::from_secs(30)) => {
96                        let mut store = sweep_store.write().await;
97                        store.sweep_expired();
98                    }
99                    _ = sweep_cancel.cancelled() => break,
100                }
101            }
102        });
103
104        Ok(())
105    }
106}