faucet_server/telemetry/
mod.rs

1use std::{pin::pin, str::FromStr, sync::OnceLock};
2
3use chrono::{DateTime, Local};
4use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
5use std::io::Write;
6use tokio::{
7    sync::mpsc::{UnboundedReceiver, UnboundedSender},
8    task::JoinHandle,
9};
10
11use crate::{
12    error::FaucetResult,
13    leak,
14    server::{logging::EventLogData, HttpLogData, LogOption},
15    shutdown::ShutdownSignal,
16};
17
18#[derive(Clone, Debug)]
19pub struct TelemetrySender {
20    pub sender_http_events: UnboundedSender<(chrono::DateTime<Local>, HttpLogData)>,
21    pub sender_log_events: UnboundedSender<(chrono::DateTime<Local>, EventLogData)>,
22}
23
24impl TelemetrySender {
25    pub fn send_http_event(&self, data: HttpLogData) {
26        let timestamp = chrono::Local::now();
27        let _ = self.sender_http_events.send((timestamp, data));
28    }
29    pub fn send_log_event(&self, data: EventLogData) {
30        let timestamp = chrono::Local::now();
31        let _ = self.sender_log_events.send((timestamp, data));
32    }
33}
34
35pub struct TelemetryManager {
36    _pool: deadpool_postgres::Pool,
37    pub http_events_join_handle: JoinHandle<()>,
38    pub log_events_join_handle: JoinHandle<()>,
39}
40
41fn make_tls() -> tokio_postgres_rustls::MakeRustlsConnect {
42    let config = rustls::ClientConfig::builder()
43        .with_root_certificates(rustls::RootCertStore::empty())
44        .with_no_client_auth();
45
46    tokio_postgres_rustls::MakeRustlsConnect::new(config)
47}
48
49type PgType = tokio_postgres::types::Type;
50
51static TELEMETRY_SENDER: OnceLock<TelemetrySender> = OnceLock::new();
52
53pub fn send_http_event(http_event: HttpLogData) {
54    if let Some(sender) = TELEMETRY_SENDER.get() {
55        sender.send_http_event(http_event);
56    }
57}
58
59pub fn send_log_event(http_event: EventLogData) {
60    if let Some(sender) = TELEMETRY_SENDER.get() {
61        sender.send_log_event(http_event);
62    }
63}
64
65impl TelemetryManager {
66    pub fn start(
67        namespace: &str,
68        version: Option<&str>,
69        database_url: &str,
70        shutdown_signal: &'static ShutdownSignal,
71    ) -> FaucetResult<TelemetryManager> {
72        let namespace = leak!(namespace) as &'static str;
73        let version = version.map(|v| leak!(v) as &'static str);
74
75        let config = tokio_postgres::Config::from_str(database_url)?;
76        let mgr_config = ManagerConfig {
77            recycling_method: RecyclingMethod::Fast,
78        };
79        let mgr = Manager::from_config(config, make_tls(), mgr_config);
80        let pool = Pool::builder(mgr).max_size(10).build()?;
81
82        let (
83            sender_http_events,
84            sender_log_events,
85            http_events_join_handle,
86            log_events_join_handle,
87        ) = handle_http_events(pool.clone(), namespace, version, shutdown_signal);
88
89        let sender = TelemetrySender {
90            sender_http_events,
91            sender_log_events,
92        };
93
94        TELEMETRY_SENDER
95            .set(sender)
96            .expect("Unable to set telemetry sender. This is a bug! Report it!");
97
98        Ok(TelemetryManager {
99            _pool: pool,
100            http_events_join_handle,
101            log_events_join_handle,
102        })
103    }
104}
105
106fn spawn_events_task(
107    mut event_rx: UnboundedReceiver<(chrono::DateTime<Local>, EventLogData)>,
108    pool: Pool,
109    namespace: &'static str,
110    version: Option<&'static str>,
111    shutdown_signal: &'static ShutdownSignal,
112) -> JoinHandle<()> {
113    tokio::task::spawn(async move {
114        let types = &[
115            PgType::TEXT,        // Namespace
116            PgType::TEXT,        // Version
117            PgType::TEXT,        // Target
118            PgType::TIMESTAMPTZ, // Timestamp
119            PgType::UUID,        // Event_Id
120            PgType::UUID,        // Parent_Event_Id
121            PgType::TEXT,        // Level
122            PgType::TEXT,        // Event Type
123            PgType::TEXT,        // Message
124            PgType::JSONB,       // Body
125        ];
126        let mut logs_buffer = Vec::with_capacity(100);
127
128        'recv: loop {
129            tokio::select! {
130                    _ = shutdown_signal.wait() => break 'recv,
131                    received = event_rx.recv_many(&mut logs_buffer, 100)  => {
132                        if received == 0 {
133                            break 'recv;
134                        }
135                    let connection = match pool.get().await {
136                        Ok(conn) => conn,
137                        Err(e) => {
138                            log::error!("Unable to acquire postgresql connection: {e}");
139                            continue 'recv;
140                        }
141                    };
142                    let copy_sink_res = connection
143                        .copy_in::<_, bytes::Bytes>(
144                            "COPY faucet_log_events FROM STDIN WITH (FORMAT binary)",
145                        )
146                        .await;
147
148                    match copy_sink_res {
149                        Ok(copy_sink) => {
150                            let copy_in_writer =
151                                tokio_postgres::binary_copy::BinaryCopyInWriter::new(copy_sink, types);
152
153                            let mut copy_in_writer = pin!(copy_in_writer);
154
155                            log::debug!("Writing {} log events to the database", logs_buffer.len());
156
157                            'write: for (timestamp, event) in logs_buffer.drain(..) {
158                                let target = &event.target;
159                                let event_id = &event.event_id;
160                                let parent_event_id = &event.parent_event_id;
161                                let event_type = &event.event_type;
162                                let message = &event.message;
163                                let body = &event.body;
164                                let level = &event.level.as_str();
165
166                                let copy_result = copy_in_writer
167                                    .as_mut()
168                                    .write(&[
169                                        &namespace,
170                                        &version,
171                                        target,
172                                        &timestamp,
173                                        event_id,
174                                        parent_event_id,
175                                        level,
176                                        event_type,
177                                        message,
178                                        body,
179                                    ])
180                                    .await;
181
182                                if let Err(e) = copy_result {
183                                    log::error!("Error writing to PostgreSQL: {e}");
184                                    break 'write;
185                                }
186                            }
187
188                            let copy_in_finish_res = copy_in_writer.finish().await;
189                            if let Err(e) = copy_in_finish_res {
190                                log::error!("Error writing to PostgreSQL: {e}");
191                                continue 'recv;
192                            }
193                        }
194                        Err(e) => {
195                            log::error!(target: "telemetry", "Error writing to the database: {e}")
196                        }
197                    }
198                }
199            }
200        }
201    })
202}
203
204fn spawn_http_events_task(
205    mut http_rx: UnboundedReceiver<(DateTime<Local>, HttpLogData)>,
206    pool: Pool,
207    namespace: &'static str,
208    version: Option<&'static str>,
209    shutdown_signal: &'static ShutdownSignal,
210) -> JoinHandle<()> {
211    tokio::task::spawn(async move {
212        let types = &[
213            PgType::UUID,        // UUID
214            PgType::TEXT,        // Namespace
215            PgType::TEXT,        // Version
216            PgType::TEXT,        // Target
217            PgType::TEXT,        // Worker Route
218            PgType::INT4,        // Worker ID
219            PgType::INET,        // IpAddr
220            PgType::TEXT,        // Method
221            PgType::TEXT,        // Path
222            PgType::TEXT,        // Query Params
223            PgType::TEXT,        // HTTP Version
224            PgType::INT2,        // Status
225            PgType::TEXT,        // User Agent
226            PgType::INT8,        // Elapsed
227            PgType::TIMESTAMPTZ, // TIMESTAMP
228        ];
229        let mut logs_buffer = Vec::with_capacity(100);
230        let mut path_buffer = Vec::<u8>::new();
231        let mut query_buffer = Vec::<u8>::new();
232        let mut version_buffer = Vec::<u8>::new();
233        let mut user_agent_buffer = Vec::<u8>::new();
234
235        'recv: loop {
236            tokio::select! {
237                _ = shutdown_signal.wait() => break 'recv,
238                received = http_rx.recv_many(&mut logs_buffer, 100)  => {
239                    if received == 0 {
240                        break 'recv;
241                    }
242                    let connection = match pool.get().await {
243                        Ok(conn) => conn,
244                        Err(e) => {
245                            log::error!("Unable to acquire postgresql connection: {e}");
246                            continue 'recv;
247                        }
248                    };
249                    let copy_sink_res = connection
250                        .copy_in::<_, bytes::Bytes>(
251                            "COPY faucet_http_events FROM STDIN WITH (FORMAT binary)",
252                        )
253                        .await;
254
255                    match copy_sink_res {
256                        Ok(copy_sink) => {
257                            let copy_in_writer =
258                                tokio_postgres::binary_copy::BinaryCopyInWriter::new(copy_sink, types);
259
260                            let mut copy_in_writer = pin!(copy_in_writer);
261
262                            log::debug!("Writing {} http events to the database", logs_buffer.len());
263
264                            'write: for (timestamp, log_data) in logs_buffer.drain(..) {
265                                let uuid = &log_data.state_data.uuid;
266                                let target = &log_data.state_data.target;
267                                let worker_id = log_data.state_data.worker_id as i32;
268                                let worker_route = log_data.state_data.worker_route;
269                                let ip = &log_data.state_data.ip;
270                                let method = &log_data.method.as_str();
271                                let _ = write!(path_buffer, "{}", log_data.path.path());
272                                let path = &std::str::from_utf8(&path_buffer).unwrap_or_default();
273                                let _ = write!(
274                                    query_buffer,
275                                    "{}",
276                                    log_data.path.query().unwrap_or_default()
277                                );
278                                let query = &std::str::from_utf8(&query_buffer).unwrap_or_default();
279                                let query = if query.is_empty() { None } else { Some(query) };
280                                let _ = write!(version_buffer, "{:?}", log_data.version);
281                                let http_version =
282                                    &std::str::from_utf8(&version_buffer).unwrap_or_default();
283                                let status = &log_data.status;
284                                let user_agent = match &log_data.user_agent {
285                                    LogOption::Some(v) => v.to_str().ok(),
286                                    LogOption::None => None,
287                                };
288
289                                let elapsed = &log_data.elapsed;
290                                let copy_result = copy_in_writer
291                                    .as_mut()
292                                    .write(&[
293                                        uuid,
294                                        &namespace,
295                                        &version,
296                                        target,
297                                        &worker_route,
298                                        &worker_id,
299                                        ip,
300                                        method,
301                                        path,
302                                        &query,
303                                        http_version,
304                                        status,
305                                        &user_agent,
306                                        elapsed,
307                                        &timestamp,
308                                    ])
309                                    .await;
310
311                                path_buffer.clear();
312                                version_buffer.clear();
313                                user_agent_buffer.clear();
314                                query_buffer.clear();
315
316                                if let Err(e) = copy_result {
317                                    log::error!("Error writing to PostgreSQL: {e}");
318                                    break 'write;
319                                }
320                            }
321
322                            let copy_in_finish_res = copy_in_writer.finish().await;
323                            if let Err(e) = copy_in_finish_res {
324                                log::error!("Error writing to PostgreSQL: {e}");
325                                continue 'recv;
326                            }
327                        }
328                        Err(e) => {
329                            log::error!(target: "telemetry", "Error writing to the database: {e}")
330                        }
331                    }
332                }
333            }
334        }
335    })
336}
337
338fn handle_http_events(
339    pool: Pool,
340    namespace: &'static str,
341    version: Option<&'static str>,
342    shutdown_signal: &'static ShutdownSignal,
343) -> (
344    UnboundedSender<(chrono::DateTime<Local>, HttpLogData)>,
345    UnboundedSender<(chrono::DateTime<Local>, EventLogData)>,
346    JoinHandle<()>,
347    JoinHandle<()>,
348) {
349    let (http_tx, http_rx) = tokio::sync::mpsc::unbounded_channel::<(_, HttpLogData)>();
350    let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<(_, EventLogData)>();
351
352    let event_handle =
353        spawn_events_task(event_rx, pool.clone(), namespace, version, shutdown_signal);
354
355    let http_handle = spawn_http_events_task(http_rx, pool, namespace, version, shutdown_signal);
356    (http_tx, event_tx, http_handle, event_handle)
357}