1use std::{pin::pin, str::FromStr, sync::OnceLock};
2
3use chrono::{DateTime, Local};
4use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
5use std::io::Write;
6use tokio::{
7 sync::mpsc::{UnboundedReceiver, UnboundedSender},
8 task::JoinHandle,
9};
10
11use crate::{
12 error::FaucetResult,
13 leak,
14 server::{logging::EventLogData, HttpLogData, LogOption},
15 shutdown::ShutdownSignal,
16};
17
18#[derive(Clone, Debug)]
19pub struct TelemetrySender {
20 pub sender_http_events: UnboundedSender<(chrono::DateTime<Local>, HttpLogData)>,
21 pub sender_log_events: UnboundedSender<(chrono::DateTime<Local>, EventLogData)>,
22}
23
24impl TelemetrySender {
25 pub fn send_http_event(&self, data: HttpLogData) {
26 let timestamp = chrono::Local::now();
27 let _ = self.sender_http_events.send((timestamp, data));
28 }
29 pub fn send_log_event(&self, data: EventLogData) {
30 let timestamp = chrono::Local::now();
31 let _ = self.sender_log_events.send((timestamp, data));
32 }
33}
34
35pub struct TelemetryManager {
36 _pool: deadpool_postgres::Pool,
37 pub http_events_join_handle: JoinHandle<()>,
38 pub log_events_join_handle: JoinHandle<()>,
39}
40
41fn make_tls() -> tokio_postgres_rustls::MakeRustlsConnect {
42 let config = rustls::ClientConfig::builder()
43 .with_root_certificates(rustls::RootCertStore::empty())
44 .with_no_client_auth();
45
46 tokio_postgres_rustls::MakeRustlsConnect::new(config)
47}
48
49type PgType = tokio_postgres::types::Type;
50
51static TELEMETRY_SENDER: OnceLock<TelemetrySender> = OnceLock::new();
52
53pub fn send_http_event(http_event: HttpLogData) {
54 if let Some(sender) = TELEMETRY_SENDER.get() {
55 sender.send_http_event(http_event);
56 }
57}
58
59pub fn send_log_event(http_event: EventLogData) {
60 if let Some(sender) = TELEMETRY_SENDER.get() {
61 sender.send_log_event(http_event);
62 }
63}
64
65impl TelemetryManager {
66 pub fn start(
67 namespace: &str,
68 version: Option<&str>,
69 database_url: &str,
70 shutdown_signal: &'static ShutdownSignal,
71 ) -> FaucetResult<TelemetryManager> {
72 let namespace = leak!(namespace) as &'static str;
73 let version = version.map(|v| leak!(v) as &'static str);
74
75 let config = tokio_postgres::Config::from_str(database_url)?;
76 let mgr_config = ManagerConfig {
77 recycling_method: RecyclingMethod::Fast,
78 };
79 let mgr = Manager::from_config(config, make_tls(), mgr_config);
80 let pool = Pool::builder(mgr).max_size(10).build()?;
81
82 let (
83 sender_http_events,
84 sender_log_events,
85 http_events_join_handle,
86 log_events_join_handle,
87 ) = handle_http_events(pool.clone(), namespace, version, shutdown_signal);
88
89 let sender = TelemetrySender {
90 sender_http_events,
91 sender_log_events,
92 };
93
94 TELEMETRY_SENDER
95 .set(sender)
96 .expect("Unable to set telemetry sender. This is a bug! Report it!");
97
98 Ok(TelemetryManager {
99 _pool: pool,
100 http_events_join_handle,
101 log_events_join_handle,
102 })
103 }
104}
105
106fn spawn_events_task(
107 mut event_rx: UnboundedReceiver<(chrono::DateTime<Local>, EventLogData)>,
108 pool: Pool,
109 namespace: &'static str,
110 version: Option<&'static str>,
111 shutdown_signal: &'static ShutdownSignal,
112) -> JoinHandle<()> {
113 tokio::task::spawn(async move {
114 let types = &[
115 PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::TIMESTAMPTZ, PgType::UUID, PgType::UUID, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::JSONB, ];
126 let mut logs_buffer = Vec::with_capacity(100);
127
128 'recv: loop {
129 tokio::select! {
130 _ = shutdown_signal.wait() => break 'recv,
131 received = event_rx.recv_many(&mut logs_buffer, 100) => {
132 if received == 0 {
133 break 'recv;
134 }
135 let connection = match pool.get().await {
136 Ok(conn) => conn,
137 Err(e) => {
138 log::error!("Unable to acquire postgresql connection: {e}");
139 continue 'recv;
140 }
141 };
142 let copy_sink_res = connection
143 .copy_in::<_, bytes::Bytes>(
144 "COPY faucet_log_events FROM STDIN WITH (FORMAT binary)",
145 )
146 .await;
147
148 match copy_sink_res {
149 Ok(copy_sink) => {
150 let copy_in_writer =
151 tokio_postgres::binary_copy::BinaryCopyInWriter::new(copy_sink, types);
152
153 let mut copy_in_writer = pin!(copy_in_writer);
154
155 log::debug!("Writing {} log events to the database", logs_buffer.len());
156
157 'write: for (timestamp, event) in logs_buffer.drain(..) {
158 let target = &event.target;
159 let event_id = &event.event_id;
160 let parent_event_id = &event.parent_event_id;
161 let event_type = &event.event_type;
162 let message = &event.message;
163 let body = &event.body;
164 let level = &event.level.as_str();
165
166 let copy_result = copy_in_writer
167 .as_mut()
168 .write(&[
169 &namespace,
170 &version,
171 target,
172 ×tamp,
173 event_id,
174 parent_event_id,
175 level,
176 event_type,
177 message,
178 body,
179 ])
180 .await;
181
182 if let Err(e) = copy_result {
183 log::error!("Error writing to PostgreSQL: {e}");
184 break 'write;
185 }
186 }
187
188 let copy_in_finish_res = copy_in_writer.finish().await;
189 if let Err(e) = copy_in_finish_res {
190 log::error!("Error writing to PostgreSQL: {e}");
191 continue 'recv;
192 }
193 }
194 Err(e) => {
195 log::error!(target: "telemetry", "Error writing to the database: {e}")
196 }
197 }
198 }
199 }
200 }
201 })
202}
203
204fn spawn_http_events_task(
205 mut http_rx: UnboundedReceiver<(DateTime<Local>, HttpLogData)>,
206 pool: Pool,
207 namespace: &'static str,
208 version: Option<&'static str>,
209 shutdown_signal: &'static ShutdownSignal,
210) -> JoinHandle<()> {
211 tokio::task::spawn(async move {
212 let types = &[
213 PgType::UUID, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::INT4, PgType::INET, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::INT2, PgType::TEXT, PgType::INT8, PgType::TIMESTAMPTZ, ];
229 let mut logs_buffer = Vec::with_capacity(100);
230 let mut path_buffer = Vec::<u8>::new();
231 let mut query_buffer = Vec::<u8>::new();
232 let mut version_buffer = Vec::<u8>::new();
233 let mut user_agent_buffer = Vec::<u8>::new();
234
235 'recv: loop {
236 tokio::select! {
237 _ = shutdown_signal.wait() => break 'recv,
238 received = http_rx.recv_many(&mut logs_buffer, 100) => {
239 if received == 0 {
240 break 'recv;
241 }
242 let connection = match pool.get().await {
243 Ok(conn) => conn,
244 Err(e) => {
245 log::error!("Unable to acquire postgresql connection: {e}");
246 continue 'recv;
247 }
248 };
249 let copy_sink_res = connection
250 .copy_in::<_, bytes::Bytes>(
251 "COPY faucet_http_events FROM STDIN WITH (FORMAT binary)",
252 )
253 .await;
254
255 match copy_sink_res {
256 Ok(copy_sink) => {
257 let copy_in_writer =
258 tokio_postgres::binary_copy::BinaryCopyInWriter::new(copy_sink, types);
259
260 let mut copy_in_writer = pin!(copy_in_writer);
261
262 log::debug!("Writing {} http events to the database", logs_buffer.len());
263
264 'write: for (timestamp, log_data) in logs_buffer.drain(..) {
265 let uuid = &log_data.state_data.uuid;
266 let target = &log_data.state_data.target;
267 let worker_id = log_data.state_data.worker_id as i32;
268 let worker_route = log_data.state_data.worker_route;
269 let ip = &log_data.state_data.ip;
270 let method = &log_data.method.as_str();
271 let _ = write!(path_buffer, "{}", log_data.path.path());
272 let path = &std::str::from_utf8(&path_buffer).unwrap_or_default();
273 let _ = write!(
274 query_buffer,
275 "{}",
276 log_data.path.query().unwrap_or_default()
277 );
278 let query = &std::str::from_utf8(&query_buffer).unwrap_or_default();
279 let query = if query.is_empty() { None } else { Some(query) };
280 let _ = write!(version_buffer, "{:?}", log_data.version);
281 let http_version =
282 &std::str::from_utf8(&version_buffer).unwrap_or_default();
283 let status = &log_data.status;
284 let user_agent = match &log_data.user_agent {
285 LogOption::Some(v) => v.to_str().ok(),
286 LogOption::None => None,
287 };
288
289 let elapsed = &log_data.elapsed;
290 let copy_result = copy_in_writer
291 .as_mut()
292 .write(&[
293 uuid,
294 &namespace,
295 &version,
296 target,
297 &worker_route,
298 &worker_id,
299 ip,
300 method,
301 path,
302 &query,
303 http_version,
304 status,
305 &user_agent,
306 elapsed,
307 ×tamp,
308 ])
309 .await;
310
311 path_buffer.clear();
312 version_buffer.clear();
313 user_agent_buffer.clear();
314 query_buffer.clear();
315
316 if let Err(e) = copy_result {
317 log::error!("Error writing to PostgreSQL: {e}");
318 break 'write;
319 }
320 }
321
322 let copy_in_finish_res = copy_in_writer.finish().await;
323 if let Err(e) = copy_in_finish_res {
324 log::error!("Error writing to PostgreSQL: {e}");
325 continue 'recv;
326 }
327 }
328 Err(e) => {
329 log::error!(target: "telemetry", "Error writing to the database: {e}")
330 }
331 }
332 }
333 }
334 }
335 })
336}
337
338fn handle_http_events(
339 pool: Pool,
340 namespace: &'static str,
341 version: Option<&'static str>,
342 shutdown_signal: &'static ShutdownSignal,
343) -> (
344 UnboundedSender<(chrono::DateTime<Local>, HttpLogData)>,
345 UnboundedSender<(chrono::DateTime<Local>, EventLogData)>,
346 JoinHandle<()>,
347 JoinHandle<()>,
348) {
349 let (http_tx, http_rx) = tokio::sync::mpsc::unbounded_channel::<(_, HttpLogData)>();
350 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<(_, EventLogData)>();
351
352 let event_handle =
353 spawn_events_task(event_rx, pool.clone(), namespace, version, shutdown_signal);
354
355 let http_handle = spawn_http_events_task(http_rx, pool, namespace, version, shutdown_signal);
356 (http_tx, event_tx, http_handle, event_handle)
357}