faucet_server/telemetry/
mod.rs

1use std::{pin::pin, str::FromStr};
2
3use chrono::Local;
4use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
5use std::io::Write;
6use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
7
8use crate::{
9    error::FaucetResult,
10    server::{LogData, LogOption},
11};
12
13#[derive(Clone)]
14pub struct TelemetrySender {
15    pub sender_http_events: UnboundedSender<(chrono::DateTime<Local>, LogData)>,
16}
17
18impl TelemetrySender {
19    pub fn send_http_event(&self, data: LogData) {
20        let timestamp = chrono::Local::now();
21        let _ = self.sender_http_events.send((timestamp, data));
22    }
23}
24
25pub struct TelemetryManager {
26    _pool: deadpool_postgres::Pool,
27    pub sender: TelemetrySender,
28    pub http_events_join_handle: JoinHandle<()>,
29}
30
31fn make_tls() -> tokio_postgres_rustls::MakeRustlsConnect {
32    let config = rustls::ClientConfig::builder()
33        .with_root_certificates(rustls::RootCertStore::empty())
34        .with_no_client_auth();
35
36    tokio_postgres_rustls::MakeRustlsConnect::new(config)
37}
38
39type PgType = tokio_postgres::types::Type;
40
41impl TelemetryManager {
42    pub fn start(
43        namespace: &str,
44        version: Option<&str>,
45        database_url: &str,
46    ) -> FaucetResult<TelemetryManager> {
47        let config = tokio_postgres::Config::from_str(database_url)?;
48        let mgr_config = ManagerConfig {
49            recycling_method: RecyclingMethod::Fast,
50        };
51        let mgr = Manager::from_config(config, make_tls(), mgr_config);
52        let pool = Pool::builder(mgr).max_size(10).build()?;
53
54        let (sender_http_events, http_events_join_handle) =
55            handle_http_events(pool.clone(), namespace, version);
56
57        Ok(TelemetryManager {
58            _pool: pool,
59            http_events_join_handle,
60            sender: TelemetrySender { sender_http_events },
61        })
62    }
63}
64
65fn handle_http_events(
66    pool: Pool,
67    namespace: &str,
68    version: Option<&str>,
69) -> (
70    UnboundedSender<(chrono::DateTime<Local>, LogData)>,
71    JoinHandle<()>,
72) {
73    let namespace = Box::<str>::from(namespace);
74    let version = version.map(Box::<str>::from);
75    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(_, LogData)>();
76    let handle = tokio::task::spawn(async move {
77        let types = &[
78            PgType::UUID,        // UUID
79            PgType::TEXT,        // Namespace
80            PgType::TEXT,        // Version
81            PgType::TEXT,        // Target
82            PgType::TEXT,        // Worker Route
83            PgType::INT4,        // Worker ID
84            PgType::INET,        // IpAddr
85            PgType::TEXT,        // Method
86            PgType::TEXT,        // Path
87            PgType::TEXT,        // Query Params
88            PgType::TEXT,        // HTTP Version
89            PgType::INT2,        // Status
90            PgType::TEXT,        // User Agent
91            PgType::INT8,        // Elapsed
92            PgType::TIMESTAMPTZ, // TIMESTAMP
93        ];
94        let mut logs_buffer = Vec::with_capacity(100);
95        let mut path_buffer = Vec::<u8>::new();
96        let mut query_buffer = Vec::<u8>::new();
97        let mut version_buffer = Vec::<u8>::new();
98        let mut user_agent_buffer = Vec::<u8>::new();
99
100        'recv: while rx.recv_many(&mut logs_buffer, 100).await > 0 {
101            let connection = match pool.get().await {
102                Ok(conn) => conn,
103                Err(e) => {
104                    log::error!("Unable to acquire postgresql connection: {e}");
105                    continue 'recv;
106                }
107            };
108            let copy_sink_res = connection
109                .copy_in::<_, bytes::Bytes>(
110                    "COPY faucet_http_events FROM STDIN WITH (FORMAT binary)",
111                )
112                .await;
113
114            match copy_sink_res {
115                Ok(copy_sink) => {
116                    let mut copy_in_writer =
117                        tokio_postgres::binary_copy::BinaryCopyInWriter::new(copy_sink, types);
118
119                    let mut copy_in_writer = pin!(copy_in_writer);
120
121                    log::debug!("Writing {} http events to the database", logs_buffer.len());
122
123                    'write: for (timestamp, log_data) in logs_buffer.drain(..) {
124                        let uuid = &log_data.state_data.uuid;
125                        let target = &log_data.state_data.target;
126                        let worker_id = log_data.state_data.worker_id as i32;
127                        let worker_route = log_data.state_data.worker_route;
128                        let ip = &log_data.state_data.ip;
129                        let method = &log_data.method.as_str();
130                        let _ = write!(path_buffer, "{}", log_data.path.path());
131                        let path = &std::str::from_utf8(&path_buffer).unwrap_or_default();
132                        let _ = write!(
133                            query_buffer,
134                            "{}",
135                            log_data.path.query().unwrap_or_default()
136                        );
137                        let query = &std::str::from_utf8(&query_buffer).unwrap_or_default();
138                        let query = if query.is_empty() { None } else { Some(query) };
139                        let _ = write!(version_buffer, "{:?}", log_data.version);
140                        let http_version =
141                            &std::str::from_utf8(&version_buffer).unwrap_or_default();
142                        let status = &log_data.status;
143                        let user_agent = match &log_data.user_agent {
144                            LogOption::Some(v) => v.to_str().ok(),
145                            LogOption::None => None,
146                        };
147
148                        let elapsed = &log_data.elapsed;
149                        let copy_result = copy_in_writer
150                            .as_mut()
151                            .write(&[
152                                uuid,
153                                &namespace,
154                                &version,
155                                target,
156                                &worker_route,
157                                &worker_id,
158                                ip,
159                                method,
160                                path,
161                                &query,
162                                http_version,
163                                status,
164                                &user_agent,
165                                elapsed,
166                                &timestamp,
167                            ])
168                            .await;
169
170                        path_buffer.clear();
171                        version_buffer.clear();
172                        user_agent_buffer.clear();
173                        query_buffer.clear();
174
175                        if let Err(e) = copy_result {
176                            log::error!("Error writing to PostgreSQL: {e}");
177                            break 'write;
178                        }
179                    }
180
181                    let copy_in_finish_res = copy_in_writer.finish().await;
182                    if let Err(e) = copy_in_finish_res {
183                        log::error!("Error writing to PostgreSQL: {e}");
184                        continue 'recv;
185                    }
186                }
187                Err(e) => {
188                    log::error!(target: "telemetry", "Error writing to the database: {e}")
189                }
190            }
191        }
192    });
193    (tx, handle)
194}