1use hyper::{http::HeaderValue, Method, Request, Response, Uri, Version};
2use uuid::Uuid;
3
4use super::onion::{Layer, Service};
5use crate::{server::service::State, telemetry::send_http_event};
6use std::{net::IpAddr, time};
7
8pub mod logger {
9 use std::{io::BufWriter, io::Write, path::PathBuf};
10
11 use hyper::body::Bytes;
12 use tokio::task::JoinHandle;
13
14 use crate::shutdown::ShutdownSignal;
15
16 pub enum Target {
17 Stderr,
18 File(PathBuf),
19 }
20
21 struct LogFileWriter {
22 sender: tokio::sync::mpsc::Sender<Bytes>,
23 }
24
25 impl std::io::Write for LogFileWriter {
26 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
27 let _ = self.sender.try_send(Bytes::copy_from_slice(buf));
28 Ok(buf.len())
29 }
30 fn flush(&mut self) -> std::io::Result<()> {
31 Ok(())
32 }
33 }
34
35 fn start_log_writer_thread(
36 path: PathBuf,
37 max_file_size: Option<u64>,
38 shutdown: &'static ShutdownSignal,
39 ) -> (LogFileWriter, JoinHandle<()>) {
40 let max_file_size = max_file_size.unwrap_or(u64::MAX);
41 let mut current_file_size = match std::fs::metadata(&path) {
42 Ok(md) => md.len(),
43 Err(_) => 0,
44 };
45 let file = std::fs::File::options()
46 .create(true)
47 .append(true)
48 .truncate(false)
49 .open(&path)
50 .expect("Unable to open or create log file");
51
52 let mut copy_path = path.clone();
54 copy_path.as_mut_os_string().push(".bak");
55
56 let mut writer = BufWriter::new(file);
57 let mut stderr = BufWriter::new(std::io::stderr());
58 let (sender, mut receiver) = tokio::sync::mpsc::channel::<Bytes>(1000);
59 let writer_thread = tokio::task::spawn(async move {
60 loop {
61 tokio::select! {
62 bytes = receiver.recv() => {
63 match bytes {
64 Some(bytes) => {
65 if let Err(e) = stderr.write_all(bytes.as_ref()) {
66 eprintln!("Unable to write to stderr: {e}");
67 };
68
69 if let Err(e) = writer.write_all(bytes.as_ref()) {
70 eprintln!("Unable to write to {path:?}: {e}");
71 };
72
73 current_file_size += bytes.len() as u64;
74 if current_file_size > max_file_size {
75 let _ = writer.flush();
77 let file = writer.get_mut();
78
79 if let Err(e) = std::fs::copy(&path, ©_path) {
81 log::error!("Unable to copy logs to backup file: {e}");
82 }
83
84 if let Err(e) = file.set_len(0) {
86 log::error!("Unable to truncate logs file: {e}");
87 }
88
89 current_file_size = 0;
90 }
91 },
92 None => break
93 }
94 },
95 _ = shutdown.wait() => break
96 }
97 }
98 let _ = writer.flush();
99 let _ = stderr.flush();
100 });
101 (LogFileWriter { sender }, writer_thread)
102 }
103
104 pub fn build_logger(
105 target: Target,
106 max_file_size: Option<u64>,
107 shutdown: &'static ShutdownSignal,
108 ) -> Option<JoinHandle<()>> {
109 let (target, handle) = match target {
110 Target::File(path) => {
111 let (writer, handle) = start_log_writer_thread(path, max_file_size, shutdown);
112 (env_logger::Target::Pipe(Box::new(writer)), Some(handle))
113 }
114 Target::Stderr => (env_logger::Target::Stderr, None),
115 };
116
117 let mut env_builder = env_logger::Builder::new();
118 env_builder
119 .parse_env(env_logger::Env::new().filter_or("FAUCET_LOG", "info"))
120 .target(target)
121 .init();
122
123 handle
124 }
125}
126
127#[derive(Clone, Copy)]
128pub struct StateData {
129 pub uuid: uuid::Uuid,
130 pub ip: IpAddr,
131 pub worker_route: Option<&'static str>,
132 pub worker_id: usize,
133 pub target: &'static str,
134}
135
136trait StateLogData: Send + Sync + 'static {
137 fn get_state_data(&self) -> StateData;
138}
139
140impl StateLogData for State {
141 #[inline(always)]
142 fn get_state_data(&self) -> StateData {
143 let uuid = self.uuid;
144 let ip = self.remote_addr;
145 let worker_id = self.client.config.worker_id;
146 let worker_route = self.client.config.worker_route;
147 let target = self.client.config.target;
148 StateData {
149 uuid,
150 ip,
151 worker_id,
152 worker_route,
153 target,
154 }
155 }
156}
157
158#[derive(PartialEq, Eq)]
159pub enum LogOption<T> {
160 None,
161 Some(T),
162}
163
164impl<T> From<Option<T>> for LogOption<T> {
165 fn from(opt: Option<T>) -> Self {
166 match opt {
167 None => LogOption::None,
168 Some(v) => LogOption::Some(v),
169 }
170 }
171}
172
173impl<T> std::fmt::Display for LogOption<T>
174where
175 T: std::fmt::Display,
176{
177 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
178 match self {
179 LogOption::None => write!(f, "-"),
180 LogOption::Some(v) => write!(f, "{v}"),
181 }
182 }
183}
184
185impl<T> std::fmt::Debug for LogOption<T>
186where
187 T: std::fmt::Debug,
188{
189 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
190 match self {
191 LogOption::None => write!(f, r#""-""#),
192 LogOption::Some(v) => write!(f, "{v:?}"),
193 }
194 }
195}
196
197pub struct HttpLogData {
198 pub state_data: StateData,
199 pub method: Method,
200 pub path: Uri,
201 pub version: Version,
202 pub status: i16,
203 pub user_agent: LogOption<HeaderValue>,
204 pub elapsed: i64,
205}
206
207impl HttpLogData {
208 fn log(&self) {
209 log::info!(
210 target: self.state_data.target,
211 r#"{ip} "{method} {route}{path} {version:?}" {status} {user_agent:?} {elapsed}"#,
212 route = self.state_data.worker_route.map(|r| r.trim_end_matches('/')).unwrap_or_default(),
213 ip = self.state_data.ip,
214 method = self.method,
215 path = self.path,
216 version = self.version,
217 status = self.status,
218 user_agent = self.user_agent,
219 elapsed = self.elapsed,
220 );
221 }
222}
223
224#[inline(always)]
225async fn capture_log_data<Body, ResBody, Error, State: StateLogData>(
226 inner: &impl Service<Request<Body>, Response = Response<ResBody>, Error = Error>,
227 req: Request<Body>,
228) -> Result<(Response<ResBody>, HttpLogData), Error> {
229 let start = time::Instant::now();
230
231 let state = req.extensions().get::<State>().expect("State not found");
233 let state_data = state.get_state_data();
234 let method = req.method().clone();
235 let path = req.uri().clone();
236 let version = req.version();
237 let user_agent: LogOption<_> = req.headers().get(hyper::header::USER_AGENT).cloned().into();
238
239 let res = inner.call(req, None).await?;
241
242 let status = res.status().as_u16() as i16;
244 let elapsed = start.elapsed().as_millis() as i64;
245
246 let log_data = HttpLogData {
247 state_data,
248 method,
249 path,
250 version,
251 status,
252 user_agent,
253 elapsed,
254 };
255
256 Ok((res, log_data))
257}
258
259pub(super) struct LogService<S> {
260 inner: S,
261}
262
263impl<S, Body, ResBody> Service<Request<Body>> for LogService<S>
264where
265 S: Service<Request<Body>, Response = Response<ResBody>> + Send + Sync,
266{
267 type Error = S::Error;
268 type Response = Response<ResBody>;
269
270 async fn call(
271 &self,
272 req: Request<Body>,
273 _: Option<IpAddr>,
274 ) -> Result<Self::Response, Self::Error> {
275 let (res, log_data) = capture_log_data::<_, _, _, State>(&self.inner, req).await?;
276
277 log_data.log();
278 send_http_event(log_data);
279
280 Ok(res)
281 }
282}
283
284pub(super) struct LogLayer {}
285
286impl<S> Layer<S> for LogLayer {
287 type Service = LogService<S>;
288 fn layer(&self, inner: S) -> Self::Service {
289 LogService { inner }
290 }
291}
292
293#[derive(serde::Deserialize, Clone, Copy)]
294pub enum FaucetTracingLevel {
295 Error,
296 Warn,
297 Info,
298 Debug,
299 Trace,
300}
301
302impl FaucetTracingLevel {
303 pub fn as_str(self) -> &'static str {
304 match self {
305 FaucetTracingLevel::Trace => "trace",
306 FaucetTracingLevel::Debug => "debug",
307 FaucetTracingLevel::Error => "error",
308 FaucetTracingLevel::Warn => "warn",
309 FaucetTracingLevel::Info => "info",
310 }
311 }
312}
313
314#[derive(serde::Deserialize)]
315pub struct EventLogData {
316 pub target: String,
317 pub event_id: Uuid,
318 pub level: FaucetTracingLevel,
319 pub parent_event_id: Option<Uuid>,
320 pub event_type: String,
321 pub message: String,
322 pub body: Option<serde_json::Value>,
323}
324
325#[derive(Debug)]
326pub enum FaucetEventParseError<'a> {
327 UnableToSplit,
328 InvalidString(&'a str),
329 SerdeError {
330 err: serde_json::Error,
331 str: &'a str,
332 },
333}
334
335pub enum FaucetEventResult<'a> {
336 Event(EventLogData),
337 Output(&'a str),
338 EventError(FaucetEventParseError<'a>),
339}
340
341pub fn parse_faucet_event(content: &str) -> FaucetEventResult<'_> {
342 use FaucetEventResult::*;
343
344 let content = content.trim_end_matches('\n');
345
346 if !content.starts_with("{{ faucet_event }}:") {
347 return Output(content);
348 }
349
350 match content.split_once(':') {
351 Some((_, content)) => {
352 let structure: EventLogData = match serde_json::from_str(content.trim()) {
353 Ok(structure) => structure,
354 Err(e) => {
355 return FaucetEventResult::EventError(FaucetEventParseError::SerdeError {
356 err: e,
357 str: content,
358 })
359 }
360 };
361 Event(structure)
362 }
363 None => EventError(FaucetEventParseError::UnableToSplit),
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use hyper::StatusCode;
370
371 use super::*;
372
373 #[tokio::test]
374 async fn log_capture() {
375 #[derive(Clone)]
376 struct MockState;
377
378 impl StateLogData for MockState {
379 fn get_state_data(&self) -> StateData {
380 StateData {
381 uuid: uuid::Uuid::now_v7(),
382 ip: IpAddr::V4([127, 0, 0, 1].into()),
383 target: "test",
384 worker_id: 1,
385 worker_route: None,
386 }
387 }
388 }
389
390 struct Svc;
391
392 impl Service<Request<()>> for Svc {
393 type Response = Response<()>;
394 type Error = ();
395 async fn call(
396 &self,
397 _: Request<()>,
398 _: Option<IpAddr>,
399 ) -> Result<Self::Response, Self::Error> {
400 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
401 Ok(Response::builder().status(StatusCode::OK).body(()).unwrap())
402 }
403 }
404
405 let req = Request::builder()
406 .method(Method::GET)
407 .uri("https://example.com/")
408 .extension(MockState)
409 .version(Version::HTTP_11)
410 .header(hyper::header::USER_AGENT, "test")
411 .body(())
412 .unwrap();
413
414 let (_, log_data) = capture_log_data::<_, _, _, MockState>(&Svc, req)
415 .await
416 .unwrap();
417
418 assert_eq!(log_data.state_data.ip, IpAddr::V4([127, 0, 0, 1].into()));
419 assert_eq!(log_data.method, Method::GET);
420 assert_eq!(log_data.path, "https://example.com/");
421 assert_eq!(log_data.version, Version::HTTP_11);
422 assert_eq!(log_data.status, 200);
423 assert_eq!(
424 log_data.user_agent,
425 LogOption::Some(HeaderValue::from_static("test"))
426 );
427 assert!(log_data.elapsed > 0);
428 assert_eq!(log_data.state_data.target, "test");
429 }
430
431 #[test]
432 fn log_option_display() {
433 assert_eq!(LogOption::<u8>::None.to_string(), "-");
434 assert_eq!(LogOption::Some(1).to_string(), "1");
435 }
436
437 #[test]
438 fn log_option_debug() {
439 assert_eq!(format!("{:?}", LogOption::<u8>::None), r#""-""#);
440 assert_eq!(format!("{:?}", LogOption::Some(1)), "1");
441 }
442
443 #[test]
444 fn log_option_from_option() {
445 assert_eq!(LogOption::<u8>::from(None), LogOption::None);
446 assert_eq!(LogOption::from(Some(1)), LogOption::Some(1));
447 }
448
449 #[test]
450 fn log_data_log() {
451 use std::io::Write;
452 use std::sync::{Arc, Mutex};
453
454 struct Buffer(Arc<Mutex<Vec<u8>>>);
455
456 impl Write for Buffer {
457 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
458 self.0.lock().unwrap().write(buf)
459 }
460 fn flush(&mut self) -> std::io::Result<()> {
461 self.0.lock().unwrap().flush()
462 }
463 }
464
465 impl Buffer {
466 fn clone_buf(&self) -> Vec<u8> {
467 self.0.lock().unwrap().clone()
468 }
469 }
470
471 impl Clone for Buffer {
472 fn clone(&self) -> Self {
473 Buffer(Arc::clone(&self.0))
474 }
475 }
476
477 let log_data = HttpLogData {
478 state_data: StateData {
479 uuid: uuid::Uuid::now_v7(),
480 target: "test",
481 ip: IpAddr::V4([127, 0, 0, 1].into()),
482 worker_route: None,
483 worker_id: 1,
484 },
485 method: Method::GET,
486 path: "https://example.com/".parse().unwrap(),
487 version: Version::HTTP_11,
488 status: 200,
489 user_agent: LogOption::Some(HeaderValue::from_static("test")),
490 elapsed: 5,
491 };
492
493 let buf = Buffer(Arc::new(Mutex::new(Vec::new())));
494 let mut logger = env_logger::Builder::new();
495 logger.filter_level(log::LevelFilter::Info);
497 logger.format(|f, record| writeln!(f, "{}", record.args()));
498 logger.target(env_logger::Target::Pipe(Box::new(buf.clone())));
499 logger.init();
500
501 log_data.log();
502
503 let log = String::from_utf8(buf.clone_buf()).unwrap();
504
505 assert_eq!(
506 log.trim(),
507 r#"127.0.0.1 "GET https://example.com/ HTTP/1.1" 200 "test" 5"#
508 )
509 }
510}