faucet_server/telemetry/
mod.rs1use std::{pin::pin, str::FromStr};
2
3use chrono::Local;
4use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod};
5use std::io::Write;
6use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
7
8use crate::{
9 error::FaucetResult,
10 server::{LogData, LogOption},
11};
12
13#[derive(Clone)]
14pub struct TelemetrySender {
15 pub sender_http_events: UnboundedSender<(chrono::DateTime<Local>, LogData)>,
16}
17
18impl TelemetrySender {
19 pub fn send_http_event(&self, data: LogData) {
20 let timestamp = chrono::Local::now();
21 let _ = self.sender_http_events.send((timestamp, data));
22 }
23}
24
25pub struct TelemetryManager {
26 _pool: deadpool_postgres::Pool,
27 pub sender: TelemetrySender,
28 pub http_events_join_handle: JoinHandle<()>,
29}
30
31fn make_tls() -> tokio_postgres_rustls::MakeRustlsConnect {
32 let config = rustls::ClientConfig::builder()
33 .with_root_certificates(rustls::RootCertStore::empty())
34 .with_no_client_auth();
35
36 tokio_postgres_rustls::MakeRustlsConnect::new(config)
37}
38
39type PgType = tokio_postgres::types::Type;
40
41impl TelemetryManager {
42 pub fn start(
43 namespace: &str,
44 version: Option<&str>,
45 database_url: &str,
46 ) -> FaucetResult<TelemetryManager> {
47 let config = tokio_postgres::Config::from_str(database_url)?;
48 let mgr_config = ManagerConfig {
49 recycling_method: RecyclingMethod::Fast,
50 };
51 let mgr = Manager::from_config(config, make_tls(), mgr_config);
52 let pool = Pool::builder(mgr).max_size(10).build()?;
53
54 let (sender_http_events, http_events_join_handle) =
55 handle_http_events(pool.clone(), namespace, version);
56
57 Ok(TelemetryManager {
58 _pool: pool,
59 http_events_join_handle,
60 sender: TelemetrySender { sender_http_events },
61 })
62 }
63}
64
65fn handle_http_events(
66 pool: Pool,
67 namespace: &str,
68 version: Option<&str>,
69) -> (
70 UnboundedSender<(chrono::DateTime<Local>, LogData)>,
71 JoinHandle<()>,
72) {
73 let namespace = Box::<str>::from(namespace);
74 let version = version.map(Box::<str>::from);
75 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(_, LogData)>();
76 let handle = tokio::task::spawn(async move {
77 let types = &[
78 PgType::UUID, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::INT4, PgType::INET, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::TEXT, PgType::INT2, PgType::TEXT, PgType::INT8, PgType::TIMESTAMPTZ, ];
94 let mut logs_buffer = Vec::with_capacity(100);
95 let mut path_buffer = Vec::<u8>::new();
96 let mut query_buffer = Vec::<u8>::new();
97 let mut version_buffer = Vec::<u8>::new();
98 let mut user_agent_buffer = Vec::<u8>::new();
99
100 'recv: while rx.recv_many(&mut logs_buffer, 100).await > 0 {
101 let connection = match pool.get().await {
102 Ok(conn) => conn,
103 Err(e) => {
104 log::error!("Unable to acquire postgresql connection: {e}");
105 continue 'recv;
106 }
107 };
108 let copy_sink_res = connection
109 .copy_in::<_, bytes::Bytes>(
110 "COPY faucet_http_events FROM STDIN WITH (FORMAT binary)",
111 )
112 .await;
113
114 match copy_sink_res {
115 Ok(copy_sink) => {
116 let mut copy_in_writer =
117 tokio_postgres::binary_copy::BinaryCopyInWriter::new(copy_sink, types);
118
119 let mut copy_in_writer = pin!(copy_in_writer);
120
121 log::debug!("Writing {} http events to the database", logs_buffer.len());
122
123 'write: for (timestamp, log_data) in logs_buffer.drain(..) {
124 let uuid = &log_data.state_data.uuid;
125 let target = &log_data.state_data.target;
126 let worker_id = log_data.state_data.worker_id as i32;
127 let worker_route = log_data.state_data.worker_route;
128 let ip = &log_data.state_data.ip;
129 let method = &log_data.method.as_str();
130 let _ = write!(path_buffer, "{}", log_data.path.path());
131 let path = &std::str::from_utf8(&path_buffer).unwrap_or_default();
132 let _ = write!(
133 query_buffer,
134 "{}",
135 log_data.path.query().unwrap_or_default()
136 );
137 let query = &std::str::from_utf8(&query_buffer).unwrap_or_default();
138 let query = if query.is_empty() { None } else { Some(query) };
139 let _ = write!(version_buffer, "{:?}", log_data.version);
140 let http_version =
141 &std::str::from_utf8(&version_buffer).unwrap_or_default();
142 let status = &log_data.status;
143 let user_agent = match &log_data.user_agent {
144 LogOption::Some(v) => v.to_str().ok(),
145 LogOption::None => None,
146 };
147
148 let elapsed = &log_data.elapsed;
149 let copy_result = copy_in_writer
150 .as_mut()
151 .write(&[
152 uuid,
153 &namespace,
154 &version,
155 target,
156 &worker_route,
157 &worker_id,
158 ip,
159 method,
160 path,
161 &query,
162 http_version,
163 status,
164 &user_agent,
165 elapsed,
166 ×tamp,
167 ])
168 .await;
169
170 path_buffer.clear();
171 version_buffer.clear();
172 user_agent_buffer.clear();
173 query_buffer.clear();
174
175 if let Err(e) = copy_result {
176 log::error!("Error writing to PostgreSQL: {e}");
177 break 'write;
178 }
179 }
180
181 let copy_in_finish_res = copy_in_writer.finish().await;
182 if let Err(e) = copy_in_finish_res {
183 log::error!("Error writing to PostgreSQL: {e}");
184 continue 'recv;
185 }
186 }
187 Err(e) => {
188 log::error!(target: "telemetry", "Error writing to the database: {e}")
189 }
190 }
191 }
192 });
193 (tx, handle)
194}