faucet_server/telemetry/
mod.rs

1use std::{path::Path, pin::pin, str::FromStr, sync::OnceLock};
2
3use chrono::{DateTime, Local};
4use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
5use std::io::Write;
6use tokio::{
7    sync::mpsc::{UnboundedReceiver, UnboundedSender},
8    task::JoinHandle,
9};
10
11use crate::{
12    cli::PgSslMode,
13    error::FaucetResult,
14    leak,
15    server::{logging::EventLogData, HttpLogData, LogOption},
16    shutdown::ShutdownSignal,
17};
18
19#[derive(Clone, Debug)]
20pub struct TelemetrySender {
21    pub sender_http_events: UnboundedSender<(chrono::DateTime<Local>, HttpLogData)>,
22    pub sender_log_events: UnboundedSender<(chrono::DateTime<Local>, EventLogData)>,
23}
24
25impl TelemetrySender {
26    pub fn send_http_event(&self, data: HttpLogData) {
27        let timestamp = chrono::Local::now();
28        let _ = self.sender_http_events.send((timestamp, data));
29    }
30    pub fn send_log_event(&self, data: EventLogData) {
31        let timestamp = chrono::Local::now();
32        let _ = self.sender_log_events.send((timestamp, data));
33    }
34}
35
36pub struct TelemetryManager {
37    _pool: deadpool_postgres::Pool,
38    pub http_events_join_handle: JoinHandle<()>,
39    pub log_events_join_handle: JoinHandle<()>,
40}
41
42fn make_tls(
43    sslmode: PgSslMode,
44    sslcert: Option<&Path>,
45) -> tokio_postgres_rustls::MakeRustlsConnect {
46    let mut root_store = rustls::RootCertStore::empty();
47
48    if matches!(sslmode, PgSslMode::VerifyCa | PgSslMode::VerifyFull) {
49        match sslcert {
50            Some(cert_path) => {
51                let mut reader =
52                    std::io::BufReader::new(std::fs::File::open(cert_path).unwrap_or_else(|e| {
53                        panic!("Failed to open certificate file '{:?}': {}", cert_path, e)
54                    }));
55                if let Ok(certs) = rustls_pemfile::certs(&mut reader) {
56                    for cert in certs {
57                        if let Err(e) = root_store.add(cert.clone().into()) {
58                            log::error!("Failed to add PEM certificate: {}", e);
59                        }
60                    }
61                }
62            }
63            None => panic!(
64                "Specified {} but did not provide a certificate path.",
65                sslmode.as_str()
66            ),
67        }
68    }
69
70    let config = rustls::ClientConfig::builder()
71        .with_root_certificates(root_store)
72        .with_no_client_auth();
73
74    tokio_postgres_rustls::MakeRustlsConnect::new(config)
75}
76
77type PgType = tokio_postgres::types::Type;
78
79static TELEMETRY_SENDER: OnceLock<TelemetrySender> = OnceLock::new();
80
81pub fn send_http_event(http_event: HttpLogData) {
82    if let Some(sender) = TELEMETRY_SENDER.get() {
83        sender.send_http_event(http_event);
84    }
85}
86
87pub fn send_log_event(http_event: EventLogData) {
88    if let Some(sender) = TELEMETRY_SENDER.get() {
89        sender.send_log_event(http_event);
90    }
91}
92
93impl TelemetryManager {
94    pub fn start(
95        namespace: &str,
96        version: Option<&str>,
97        database_url: &str,
98        sslmode: PgSslMode,
99        sslcert: Option<&Path>,
100        shutdown_signal: &'static ShutdownSignal,
101    ) -> FaucetResult<TelemetryManager> {
102        log::debug!("Connecting to PostgreSQL with params: namespace='{}', version='{:?}', database_url='[REDACTED]'", namespace, version);
103        let namespace = leak!(namespace) as &'static str;
104        let version = version.map(|v| leak!(v) as &'static str);
105
106        let config = tokio_postgres::Config::from_str(database_url)?;
107        let mgr_config = ManagerConfig {
108            recycling_method: RecyclingMethod::Fast,
109        };
110        let mgr = Manager::from_config(config, make_tls(sslmode, sslcert), mgr_config);
111        let pool = Pool::builder(mgr).max_size(10).build()?;
112
113        let (
114            sender_http_events,
115            sender_log_events,
116            http_events_join_handle,
117            log_events_join_handle,
118        ) = handle_http_events(pool.clone(), namespace, version, shutdown_signal);
119
120        let sender = TelemetrySender {
121            sender_http_events,
122            sender_log_events,
123        };
124
125        TELEMETRY_SENDER
126            .set(sender)
127            .expect("Unable to set telemetry sender. This is a bug! Report it!");
128
129        Ok(TelemetryManager {
130            _pool: pool,
131            http_events_join_handle,
132            log_events_join_handle,
133        })
134    }
135}
136
137fn spawn_events_task(
138    mut event_rx: UnboundedReceiver<(chrono::DateTime<Local>, EventLogData)>,
139    pool: Pool,
140    namespace: &'static str,
141    version: Option<&'static str>,
142    shutdown_signal: &'static ShutdownSignal,
143) -> JoinHandle<()> {
144    tokio::task::spawn(async move {
145        let types = &[
146            PgType::TEXT,        // Namespace
147            PgType::TEXT,        // Version
148            PgType::TEXT,        // Target
149            PgType::TIMESTAMPTZ, // Timestamp
150            PgType::UUID,        // Event_Id
151            PgType::UUID,        // Parent_Event_Id
152            PgType::TEXT,        // Level
153            PgType::TEXT,        // Event Type
154            PgType::TEXT,        // Message
155            PgType::JSONB,       // Body
156        ];
157        let mut logs_buffer = Vec::with_capacity(100);
158
159        'recv: loop {
160            tokio::select! {
161                    _ = shutdown_signal.wait() => break 'recv,
162                    received = event_rx.recv_many(&mut logs_buffer, 100)  => {
163                        if received == 0 {
164                            break 'recv;
165                        }
166                    let connection = match pool.get().await {
167                        Ok(conn) => conn,
168                        Err(e) => {
169                            log::error!("Unable to acquire postgresql connection: {e}");
170                            continue 'recv;
171                        }
172                    };
173                    let copy_sink_res = connection
174                        .copy_in::<_, bytes::Bytes>(
175                            "COPY faucet_log_events FROM STDIN WITH (FORMAT binary)",
176                        )
177                        .await;
178
179                    match copy_sink_res {
180                        Ok(copy_sink) => {
181                            let copy_in_writer =
182                                tokio_postgres::binary_copy::BinaryCopyInWriter::new(copy_sink, types);
183
184                            let mut copy_in_writer = pin!(copy_in_writer);
185
186                            log::debug!("Writing {} log events to the database", logs_buffer.len());
187
188                            'write: for (timestamp, event) in logs_buffer.drain(..) {
189                                let target = &event.target;
190                                let event_id = &event.event_id;
191                                let parent_event_id = &event.parent_event_id;
192                                let event_type = &event.event_type;
193                                let message = &event.message;
194                                let body = &event.body;
195                                let level = &event.level.as_str();
196
197                                let copy_result = copy_in_writer
198                                    .as_mut()
199                                    .write(&[
200                                        &namespace,
201                                        &version,
202                                        target,
203                                        &timestamp,
204                                        event_id,
205                                        parent_event_id,
206                                        level,
207                                        event_type,
208                                        message,
209                                        body,
210                                    ])
211                                    .await;
212
213                                if let Err(e) = copy_result {
214                                    log::error!("Error writing to PostgreSQL: {e}");
215                                    break 'write;
216                                }
217                            }
218
219                            let copy_in_finish_res = copy_in_writer.finish().await;
220                            if let Err(e) = copy_in_finish_res {
221                                log::error!("Error writing to PostgreSQL: {e}");
222                                continue 'recv;
223                            }
224                        }
225                        Err(e) => {
226                            log::error!(target: "telemetry", "Error writing to the database: {e}")
227                        }
228                    }
229                }
230            }
231        }
232    })
233}
234
235fn spawn_http_events_task(
236    mut http_rx: UnboundedReceiver<(DateTime<Local>, HttpLogData)>,
237    pool: Pool,
238    namespace: &'static str,
239    version: Option<&'static str>,
240    shutdown_signal: &'static ShutdownSignal,
241) -> JoinHandle<()> {
242    tokio::task::spawn(async move {
243        let types = &[
244            PgType::UUID,        // UUID
245            PgType::TEXT,        // Namespace
246            PgType::TEXT,        // Version
247            PgType::TEXT,        // Target
248            PgType::TEXT,        // Worker Route
249            PgType::INT4,        // Worker ID
250            PgType::INET,        // IpAddr
251            PgType::TEXT,        // Method
252            PgType::TEXT,        // Path
253            PgType::TEXT,        // Query Params
254            PgType::TEXT,        // HTTP Version
255            PgType::INT2,        // Status
256            PgType::TEXT,        // User Agent
257            PgType::INT8,        // Elapsed
258            PgType::TIMESTAMPTZ, // TIMESTAMP
259        ];
260        let mut logs_buffer = Vec::with_capacity(100);
261        let mut path_buffer = Vec::<u8>::new();
262        let mut query_buffer = Vec::<u8>::new();
263        let mut version_buffer = Vec::<u8>::new();
264        let mut user_agent_buffer = Vec::<u8>::new();
265
266        'recv: loop {
267            tokio::select! {
268                _ = shutdown_signal.wait() => break 'recv,
269                received = http_rx.recv_many(&mut logs_buffer, 100)  => {
270                    if received == 0 {
271                        break 'recv;
272                    }
273                    let connection = match pool.get().await {
274                        Ok(conn) => conn,
275                        Err(e) => {
276                            log::error!("Unable to acquire postgresql connection: {e}");
277                            continue 'recv;
278                        }
279                    };
280                    let copy_sink_res = connection
281                        .copy_in::<_, bytes::Bytes>(
282                            "COPY faucet_http_events FROM STDIN WITH (FORMAT binary)",
283                        )
284                        .await;
285
286                    match copy_sink_res {
287                        Ok(copy_sink) => {
288                            let copy_in_writer =
289                                tokio_postgres::binary_copy::BinaryCopyInWriter::new(copy_sink, types);
290
291                            let mut copy_in_writer = pin!(copy_in_writer);
292
293                            log::debug!("Writing {} http events to the database", logs_buffer.len());
294
295                            'write: for (timestamp, log_data) in logs_buffer.drain(..) {
296                                let uuid = &log_data.state_data.uuid;
297                                let target = &log_data.state_data.target;
298                                let worker_id = log_data.state_data.worker_id as i32;
299                                let worker_route = log_data.state_data.worker_route;
300                                let ip = &log_data.state_data.ip;
301                                let method = &log_data.method.as_str();
302                                let _ = write!(path_buffer, "{}", log_data.path.path());
303                                let path = &std::str::from_utf8(&path_buffer).unwrap_or_default();
304                                let _ = write!(
305                                    query_buffer,
306                                    "{}",
307                                    log_data.path.query().unwrap_or_default()
308                                );
309                                let query = &std::str::from_utf8(&query_buffer).unwrap_or_default();
310                                let query = if query.is_empty() { None } else { Some(query) };
311                                let _ = write!(version_buffer, "{:?}", log_data.version);
312                                let http_version =
313                                    &std::str::from_utf8(&version_buffer).unwrap_or_default();
314                                let status = &log_data.status;
315                                let user_agent = match &log_data.user_agent {
316                                    LogOption::Some(v) => v.to_str().ok(),
317                                    LogOption::None => None,
318                                };
319
320                                let elapsed = &log_data.elapsed;
321                                let copy_result = copy_in_writer
322                                    .as_mut()
323                                    .write(&[
324                                        uuid,
325                                        &namespace,
326                                        &version,
327                                        target,
328                                        &worker_route,
329                                        &worker_id,
330                                        ip,
331                                        method,
332                                        path,
333                                        &query,
334                                        http_version,
335                                        status,
336                                        &user_agent,
337                                        elapsed,
338                                        &timestamp,
339                                    ])
340                                    .await;
341
342                                path_buffer.clear();
343                                version_buffer.clear();
344                                user_agent_buffer.clear();
345                                query_buffer.clear();
346
347                                if let Err(e) = copy_result {
348                                    log::error!("Error writing to PostgreSQL: {e}");
349                                    break 'write;
350                                }
351                            }
352
353                            let copy_in_finish_res = copy_in_writer.finish().await;
354                            if let Err(e) = copy_in_finish_res {
355                                log::error!("Error writing to PostgreSQL: {e}");
356                                continue 'recv;
357                            }
358                        }
359                        Err(e) => {
360                            log::error!(target: "telemetry", "Error writing to the database: {e}")
361                        }
362                    }
363                }
364            }
365        }
366    })
367}
368
369fn handle_http_events(
370    pool: Pool,
371    namespace: &'static str,
372    version: Option<&'static str>,
373    shutdown_signal: &'static ShutdownSignal,
374) -> (
375    UnboundedSender<(chrono::DateTime<Local>, HttpLogData)>,
376    UnboundedSender<(chrono::DateTime<Local>, EventLogData)>,
377    JoinHandle<()>,
378    JoinHandle<()>,
379) {
380    let (http_tx, http_rx) = tokio::sync::mpsc::unbounded_channel::<(_, HttpLogData)>();
381    let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<(_, EventLogData)>();
382
383    let event_handle =
384        spawn_events_task(event_rx, pool.clone(), namespace, version, shutdown_signal);
385
386    let http_handle = spawn_http_events_task(http_rx, pool, namespace, version, shutdown_signal);
387    (http_tx, event_tx, http_handle, event_handle)
388}