1use std::{path::Path, pin::pin, str::FromStr, sync::OnceLock};
2
3use chrono::{DateTime, Local};
4use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
5use std::io::Write;
6use tokio::{
7 sync::mpsc::{UnboundedReceiver, UnboundedSender},
8 task::JoinHandle,
9};
10
11use crate::{
12 cli::PgSslMode,
13 error::FaucetResult,
14 leak,
15 server::{logging::EventLogData, HttpLogData, LogOption},
16 shutdown::ShutdownSignal,
17};
18
19#[derive(Clone, Debug)]
20pub struct TelemetrySender {
21 pub sender_http_events: UnboundedSender<(chrono::DateTime<Local>, HttpLogData)>,
22 pub sender_log_events: UnboundedSender<(chrono::DateTime<Local>, EventLogData)>,
23}
24
25impl TelemetrySender {
26 pub fn send_http_event(&self, data: HttpLogData) {
27 let timestamp = chrono::Local::now();
28 let _ = self.sender_http_events.send((timestamp, data));
29 }
30 pub fn send_log_event(&self, data: EventLogData) {
31 let timestamp = chrono::Local::now();
32 let _ = self.sender_log_events.send((timestamp, data));
33 }
34}
35
36pub struct TelemetryManager {
37 _pool: deadpool_postgres::Pool,
38 pub http_events_join_handle: JoinHandle<()>,
39 pub log_events_join_handle: JoinHandle<()>,
40}
41
42fn make_tls(
43 sslmode: PgSslMode,
44 sslcert: Option<&Path>,
45) -> tokio_postgres_rustls::MakeRustlsConnect {
46 let mut root_store = rustls::RootCertStore::empty();
47
48 if matches!(sslmode, PgSslMode::VerifyCa | PgSslMode::VerifyFull) {
49 match sslcert {
50 Some(cert_path) => {
51 let mut reader =
52 std::io::BufReader::new(std::fs::File::open(cert_path).unwrap_or_else(|e| {
53 panic!("Failed to open certificate file '{:?}': {}", cert_path, e)
54 }));
55 if let Ok(certs) = rustls_pemfile::certs(&mut reader) {
56 for cert in certs {
57 if let Err(e) = root_store.add(cert.clone().into()) {
58 log::error!("Failed to add PEM certificate: {}", e);
59 }
60 }
61 }
62 }
63 None => panic!(
64 "Specified {} but did not provide a certificate path.",
65 sslmode.as_str()
66 ),
67 }
68 }
69
70 let config = rustls::ClientConfig::builder()
71 .with_root_certificates(root_store)
72 .with_no_client_auth();
73
74 tokio_postgres_rustls::MakeRustlsConnect::new(config)
75}
76
77type PgType = tokio_postgres::types::Type;
78
79static TELEMETRY_SENDER: OnceLock<TelemetrySender> = OnceLock::new();
80
81pub fn send_http_event(http_event: HttpLogData) {
82 if let Some(sender) = TELEMETRY_SENDER.get() {
83 sender.send_http_event(http_event);
84 }
85}
86
87pub fn send_log_event(http_event: EventLogData) {
88 if let Some(sender) = TELEMETRY_SENDER.get() {
89 sender.send_log_event(http_event);
90 }
91}
92
93impl TelemetryManager {
94 pub fn start(
95 namespace: &str,
96 version: Option<&str>,
97 database_url: &str,
98 sslmode: PgSslMode,
99 sslcert: Option<&Path>,
100 shutdown_signal: &'static ShutdownSignal,
101 ) -> FaucetResult<TelemetryManager> {
102 log::debug!("Connecting to PostgreSQL with params: namespace='{}', version='{:?}', database_url='[REDACTED]'", namespace, version);
103 let namespace = leak!(namespace) as &'static str;
104 let version = version.map(|v| leak!(v) as &'static str);
105
106 let config = tokio_postgres::Config::from_str(database_url)?;
107 let mgr_config = ManagerConfig {
108 recycling_method: RecyclingMethod::Fast,
109 };
110 let mgr = Manager::from_config(config, make_tls(sslmode, sslcert), mgr_config);
111 let pool = Pool::builder(mgr).max_size(10).build()?;
112
113 let (
114 sender_http_events,
115 sender_log_events,
116 http_events_join_handle,
117 log_events_join_handle,
118 ) = handle_http_events(pool.clone(), namespace, version, shutdown_signal);
119
120 let sender = TelemetrySender {
121 sender_http_events,
122 sender_log_events,
123 };
124
125 TELEMETRY_SENDER
126 .set(sender)
127 .expect("Unable to set telemetry sender. This is a bug! Report it!");
128
129 Ok(TelemetryManager {
130 _pool: pool,
131 http_events_join_handle,
132 log_events_join_handle,
133 })
134 }
135}
136
137fn spawn_events_task(
138 mut event_rx: UnboundedReceiver<(chrono::DateTime<Local>, EventLogData)>,
139 pool: Pool,
140 namespace: &'static str,
141 version: Option<&'static str>,
142 shutdown_signal: &'static ShutdownSignal,
143) -> JoinHandle<()> {
144 tokio::task::spawn(async move {
145 let types = &[
146 PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::TIMESTAMPTZ, PgType::UUID, PgType::UUID, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::JSONB, ];
157 let mut logs_buffer = Vec::with_capacity(100);
158
159 'recv: loop {
160 tokio::select! {
161 _ = shutdown_signal.wait() => break 'recv,
162 received = event_rx.recv_many(&mut logs_buffer, 100) => {
163 if received == 0 {
164 break 'recv;
165 }
166 let connection = match pool.get().await {
167 Ok(conn) => conn,
168 Err(e) => {
169 log::error!("Unable to acquire postgresql connection: {e}");
170 continue 'recv;
171 }
172 };
173 let copy_sink_res = connection
174 .copy_in::<_, bytes::Bytes>(
175 "COPY faucet_log_events FROM STDIN WITH (FORMAT binary)",
176 )
177 .await;
178
179 match copy_sink_res {
180 Ok(copy_sink) => {
181 let copy_in_writer =
182 tokio_postgres::binary_copy::BinaryCopyInWriter::new(copy_sink, types);
183
184 let mut copy_in_writer = pin!(copy_in_writer);
185
186 log::debug!("Writing {} log events to the database", logs_buffer.len());
187
188 'write: for (timestamp, event) in logs_buffer.drain(..) {
189 let target = &event.target;
190 let event_id = &event.event_id;
191 let parent_event_id = &event.parent_event_id;
192 let event_type = &event.event_type;
193 let message = &event.message;
194 let body = &event.body;
195 let level = &event.level.as_str();
196
197 let copy_result = copy_in_writer
198 .as_mut()
199 .write(&[
200 &namespace,
201 &version,
202 target,
203 ×tamp,
204 event_id,
205 parent_event_id,
206 level,
207 event_type,
208 message,
209 body,
210 ])
211 .await;
212
213 if let Err(e) = copy_result {
214 log::error!("Error writing to PostgreSQL: {e}");
215 break 'write;
216 }
217 }
218
219 let copy_in_finish_res = copy_in_writer.finish().await;
220 if let Err(e) = copy_in_finish_res {
221 log::error!("Error writing to PostgreSQL: {e}");
222 continue 'recv;
223 }
224 }
225 Err(e) => {
226 log::error!(target: "telemetry", "Error writing to the database: {e}")
227 }
228 }
229 }
230 }
231 }
232 })
233}
234
235fn spawn_http_events_task(
236 mut http_rx: UnboundedReceiver<(DateTime<Local>, HttpLogData)>,
237 pool: Pool,
238 namespace: &'static str,
239 version: Option<&'static str>,
240 shutdown_signal: &'static ShutdownSignal,
241) -> JoinHandle<()> {
242 tokio::task::spawn(async move {
243 let types = &[
244 PgType::UUID, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::INT4, PgType::INET, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::INT2, PgType::TEXT, PgType::INT8, PgType::TIMESTAMPTZ, ];
260 let mut logs_buffer = Vec::with_capacity(100);
261 let mut path_buffer = Vec::<u8>::new();
262 let mut query_buffer = Vec::<u8>::new();
263 let mut version_buffer = Vec::<u8>::new();
264 let mut user_agent_buffer = Vec::<u8>::new();
265
266 'recv: loop {
267 tokio::select! {
268 _ = shutdown_signal.wait() => break 'recv,
269 received = http_rx.recv_many(&mut logs_buffer, 100) => {
270 if received == 0 {
271 break 'recv;
272 }
273 let connection = match pool.get().await {
274 Ok(conn) => conn,
275 Err(e) => {
276 log::error!("Unable to acquire postgresql connection: {e}");
277 continue 'recv;
278 }
279 };
280 let copy_sink_res = connection
281 .copy_in::<_, bytes::Bytes>(
282 "COPY faucet_http_events FROM STDIN WITH (FORMAT binary)",
283 )
284 .await;
285
286 match copy_sink_res {
287 Ok(copy_sink) => {
288 let copy_in_writer =
289 tokio_postgres::binary_copy::BinaryCopyInWriter::new(copy_sink, types);
290
291 let mut copy_in_writer = pin!(copy_in_writer);
292
293 log::debug!("Writing {} http events to the database", logs_buffer.len());
294
295 'write: for (timestamp, log_data) in logs_buffer.drain(..) {
296 let uuid = &log_data.state_data.uuid;
297 let target = &log_data.state_data.target;
298 let worker_id = log_data.state_data.worker_id as i32;
299 let worker_route = log_data.state_data.worker_route;
300 let ip = &log_data.state_data.ip;
301 let method = &log_data.method.as_str();
302 let _ = write!(path_buffer, "{}", log_data.path.path());
303 let path = &std::str::from_utf8(&path_buffer).unwrap_or_default();
304 let _ = write!(
305 query_buffer,
306 "{}",
307 log_data.path.query().unwrap_or_default()
308 );
309 let query = &std::str::from_utf8(&query_buffer).unwrap_or_default();
310 let query = if query.is_empty() { None } else { Some(query) };
311 let _ = write!(version_buffer, "{:?}", log_data.version);
312 let http_version =
313 &std::str::from_utf8(&version_buffer).unwrap_or_default();
314 let status = &log_data.status;
315 let user_agent = match &log_data.user_agent {
316 LogOption::Some(v) => v.to_str().ok(),
317 LogOption::None => None,
318 };
319
320 let elapsed = &log_data.elapsed;
321 let copy_result = copy_in_writer
322 .as_mut()
323 .write(&[
324 uuid,
325 &namespace,
326 &version,
327 target,
328 &worker_route,
329 &worker_id,
330 ip,
331 method,
332 path,
333 &query,
334 http_version,
335 status,
336 &user_agent,
337 elapsed,
338 ×tamp,
339 ])
340 .await;
341
342 path_buffer.clear();
343 version_buffer.clear();
344 user_agent_buffer.clear();
345 query_buffer.clear();
346
347 if let Err(e) = copy_result {
348 log::error!("Error writing to PostgreSQL: {e}");
349 break 'write;
350 }
351 }
352
353 let copy_in_finish_res = copy_in_writer.finish().await;
354 if let Err(e) = copy_in_finish_res {
355 log::error!("Error writing to PostgreSQL: {e}");
356 continue 'recv;
357 }
358 }
359 Err(e) => {
360 log::error!(target: "telemetry", "Error writing to the database: {e}")
361 }
362 }
363 }
364 }
365 }
366 })
367}
368
369fn handle_http_events(
370 pool: Pool,
371 namespace: &'static str,
372 version: Option<&'static str>,
373 shutdown_signal: &'static ShutdownSignal,
374) -> (
375 UnboundedSender<(chrono::DateTime<Local>, HttpLogData)>,
376 UnboundedSender<(chrono::DateTime<Local>, EventLogData)>,
377 JoinHandle<()>,
378 JoinHandle<()>,
379) {
380 let (http_tx, http_rx) = tokio::sync::mpsc::unbounded_channel::<(_, HttpLogData)>();
381 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<(_, EventLogData)>();
382
383 let event_handle =
384 spawn_events_task(event_rx, pool.clone(), namespace, version, shutdown_signal);
385
386 let http_handle = spawn_http_events_task(http_rx, pool, namespace, version, shutdown_signal);
387 (http_tx, event_tx, http_handle, event_handle)
388}