faucet_server/server/
logging.rs

1use hyper::{http::HeaderValue, Method, Request, Response, Uri, Version};
2use uuid::Uuid;
3
4use super::onion::{Layer, Service};
5use crate::{server::service::State, telemetry::send_http_event};
6use std::{net::IpAddr, time};
7
8pub mod logger {
9    use std::{io::BufWriter, io::Write, path::PathBuf};
10
11    use hyper::body::Bytes;
12    use tokio::task::JoinHandle;
13
14    use crate::shutdown::ShutdownSignal;
15
16    pub enum Target {
17        Stderr,
18        File(PathBuf),
19    }
20
21    struct LogFileWriter {
22        sender: tokio::sync::mpsc::Sender<Bytes>,
23    }
24
25    impl std::io::Write for LogFileWriter {
26        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
27            let _ = self.sender.try_send(Bytes::copy_from_slice(buf));
28            Ok(buf.len())
29        }
30        fn flush(&mut self) -> std::io::Result<()> {
31            Ok(())
32        }
33    }
34
35    fn start_log_writer_thread(
36        path: PathBuf,
37        max_file_size: Option<u64>,
38        shutdown: &'static ShutdownSignal,
39    ) -> (LogFileWriter, JoinHandle<()>) {
40        let max_file_size = max_file_size.unwrap_or(u64::MAX);
41        let mut current_file_size = match std::fs::metadata(&path) {
42            Ok(md) => md.len(),
43            Err(_) => 0,
44        };
45        let file = std::fs::File::options()
46            .create(true)
47            .append(true)
48            .truncate(false)
49            .open(&path)
50            .expect("Unable to open or create log file");
51
52        // Create a file path to a backup of the previous logs with MAX file size
53        let mut copy_path = path.clone();
54        copy_path.as_mut_os_string().push(".bak");
55
56        let mut writer = BufWriter::new(file);
57        let mut stderr = BufWriter::new(std::io::stderr());
58        let (sender, mut receiver) = tokio::sync::mpsc::channel::<Bytes>(1000);
59        let writer_thread = tokio::task::spawn(async move {
60            loop {
61                tokio::select! {
62                    bytes = receiver.recv() => {
63                        match bytes {
64                            Some(bytes) => {
65                                if let Err(e) = stderr.write_all(bytes.as_ref()) {
66                                    eprintln!("Unable to write to stderr: {e}");
67                                };
68
69                                if let Err(e) = writer.write_all(bytes.as_ref()) {
70                                    eprintln!("Unable to write to {path:?}: {e}");
71                                };
72
73                                current_file_size += bytes.len() as u64;
74                                if current_file_size > max_file_size {
75                                    // Flush the writer
76                                    let _ = writer.flush();
77                                    let file = writer.get_mut();
78
79                                    // Copy the current file to the backup
80                                    if let Err(e) = std::fs::copy(&path, &copy_path) {
81                                        log::error!("Unable to copy logs to backup file: {e}");
82                                    }
83
84                                    // Truncate the logs file
85                                    if let Err(e) = file.set_len(0) {
86                                        log::error!("Unable to truncate logs file: {e}");
87                                    }
88
89                                    current_file_size = 0;
90                                }
91                            },
92                            None => break
93                        }
94                    },
95                    _ = shutdown.wait() => break
96                }
97            }
98            let _ = writer.flush();
99            let _ = stderr.flush();
100        });
101        (LogFileWriter { sender }, writer_thread)
102    }
103
104    pub fn build_logger(
105        target: Target,
106        max_file_size: Option<u64>,
107        shutdown: &'static ShutdownSignal,
108    ) -> Option<JoinHandle<()>> {
109        let (target, handle) = match target {
110            Target::File(path) => {
111                let (writer, handle) = start_log_writer_thread(path, max_file_size, shutdown);
112                (env_logger::Target::Pipe(Box::new(writer)), Some(handle))
113            }
114            Target::Stderr => (env_logger::Target::Stderr, None),
115        };
116
117        let mut env_builder = env_logger::Builder::new();
118        env_builder
119            .parse_env(env_logger::Env::new().filter_or("FAUCET_LOG", "info"))
120            .target(target)
121            .init();
122
123        handle
124    }
125}
126
127#[derive(Clone, Copy)]
128pub struct StateData {
129    pub uuid: uuid::Uuid,
130    pub ip: IpAddr,
131    pub worker_route: Option<&'static str>,
132    pub worker_id: usize,
133    pub target: &'static str,
134}
135
136trait StateLogData: Send + Sync + 'static {
137    fn get_state_data(&self) -> StateData;
138}
139
140impl StateLogData for State {
141    #[inline(always)]
142    fn get_state_data(&self) -> StateData {
143        let uuid = self.uuid;
144        let ip = self.remote_addr;
145        let worker_id = self.client.config.worker_id;
146        let worker_route = self.client.config.worker_route;
147        let target = self.client.config.target;
148        StateData {
149            uuid,
150            ip,
151            worker_id,
152            worker_route,
153            target,
154        }
155    }
156}
157
158#[derive(PartialEq, Eq)]
159pub enum LogOption<T> {
160    None,
161    Some(T),
162}
163
164impl<T> From<Option<T>> for LogOption<T> {
165    fn from(opt: Option<T>) -> Self {
166        match opt {
167            None => LogOption::None,
168            Some(v) => LogOption::Some(v),
169        }
170    }
171}
172
173impl<T> std::fmt::Display for LogOption<T>
174where
175    T: std::fmt::Display,
176{
177    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
178        match self {
179            LogOption::None => write!(f, "-"),
180            LogOption::Some(v) => write!(f, "{v}"),
181        }
182    }
183}
184
185impl<T> std::fmt::Debug for LogOption<T>
186where
187    T: std::fmt::Debug,
188{
189    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
190        match self {
191            LogOption::None => write!(f, r#""-""#),
192            LogOption::Some(v) => write!(f, "{v:?}"),
193        }
194    }
195}
196
197pub struct HttpLogData {
198    pub state_data: StateData,
199    pub method: Method,
200    pub path: Uri,
201    pub version: Version,
202    pub status: i16,
203    pub user_agent: LogOption<HeaderValue>,
204    pub elapsed: i64,
205}
206
207impl HttpLogData {
208    fn log(&self) {
209        log::info!(
210            target: self.state_data.target,
211            r#"{ip} "{method} {route}{path} {version:?}" {status} {user_agent:?} {elapsed}"#,
212            route = self.state_data.worker_route.map(|r| r.trim_end_matches('/')).unwrap_or_default(),
213            ip = self.state_data.ip,
214            method = self.method,
215            path = self.path,
216            version = self.version,
217            status = self.status,
218            user_agent = self.user_agent,
219            elapsed = self.elapsed,
220        );
221    }
222}
223
224#[inline(always)]
225async fn capture_log_data<Body, ResBody, Error, State: StateLogData>(
226    inner: &impl Service<Request<Body>, Response = Response<ResBody>, Error = Error>,
227    req: Request<Body>,
228) -> Result<(Response<ResBody>, HttpLogData), Error> {
229    let start = time::Instant::now();
230
231    // Extract request info for logging
232    let state = req.extensions().get::<State>().expect("State not found");
233    let state_data = state.get_state_data();
234    let method = req.method().clone();
235    let path = req.uri().clone();
236    let version = req.version();
237    let user_agent: LogOption<_> = req.headers().get(hyper::header::USER_AGENT).cloned().into();
238
239    // Make the request
240    let res = inner.call(req, None).await?;
241
242    // Extract response info for logging
243    let status = res.status().as_u16() as i16;
244    let elapsed = start.elapsed().as_millis() as i64;
245
246    let log_data = HttpLogData {
247        state_data,
248        method,
249        path,
250        version,
251        status,
252        user_agent,
253        elapsed,
254    };
255
256    Ok((res, log_data))
257}
258
259pub(super) struct LogService<S> {
260    inner: S,
261}
262
263impl<S, Body, ResBody> Service<Request<Body>> for LogService<S>
264where
265    S: Service<Request<Body>, Response = Response<ResBody>> + Send + Sync,
266{
267    type Error = S::Error;
268    type Response = Response<ResBody>;
269
270    async fn call(
271        &self,
272        req: Request<Body>,
273        _: Option<IpAddr>,
274    ) -> Result<Self::Response, Self::Error> {
275        let (res, log_data) = capture_log_data::<_, _, _, State>(&self.inner, req).await?;
276
277        log_data.log();
278        send_http_event(log_data);
279
280        Ok(res)
281    }
282}
283
284pub(super) struct LogLayer {}
285
286impl<S> Layer<S> for LogLayer {
287    type Service = LogService<S>;
288    fn layer(&self, inner: S) -> Self::Service {
289        LogService { inner }
290    }
291}
292
293#[derive(serde::Deserialize, Clone, Copy)]
294pub enum FaucetTracingLevel {
295    Error,
296    Warn,
297    Info,
298    Debug,
299    Trace,
300}
301
302impl FaucetTracingLevel {
303    pub fn as_str(self) -> &'static str {
304        match self {
305            FaucetTracingLevel::Trace => "trace",
306            FaucetTracingLevel::Debug => "debug",
307            FaucetTracingLevel::Error => "error",
308            FaucetTracingLevel::Warn => "warn",
309            FaucetTracingLevel::Info => "info",
310        }
311    }
312}
313
314#[derive(serde::Deserialize)]
315pub struct EventLogData {
316    pub target: String,
317    pub event_id: Uuid,
318    pub level: FaucetTracingLevel,
319    pub parent_event_id: Option<Uuid>,
320    pub event_type: String,
321    pub message: String,
322    pub body: Option<serde_json::Value>,
323}
324
325#[derive(Debug)]
326pub enum FaucetEventParseError<'a> {
327    UnableToSplit,
328    InvalidString(&'a str),
329    SerdeError {
330        err: serde_json::Error,
331        str: &'a str,
332    },
333}
334
335pub enum FaucetEventResult<'a> {
336    Event(EventLogData),
337    Output(&'a str),
338    EventError(FaucetEventParseError<'a>),
339}
340
341pub fn parse_faucet_event(content: &str) -> FaucetEventResult<'_> {
342    use FaucetEventResult::*;
343
344    let content = content.trim_end_matches('\n');
345
346    if !content.starts_with("{{ faucet_event }}:") {
347        return Output(content);
348    }
349
350    match content.split_once(':') {
351        Some((_, content)) => {
352            let structure: EventLogData = match serde_json::from_str(content.trim()) {
353                Ok(structure) => structure,
354                Err(e) => {
355                    return FaucetEventResult::EventError(FaucetEventParseError::SerdeError {
356                        err: e,
357                        str: content,
358                    })
359                }
360            };
361            Event(structure)
362        }
363        None => EventError(FaucetEventParseError::UnableToSplit),
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use hyper::StatusCode;
370
371    use super::*;
372
373    #[tokio::test]
374    async fn log_capture() {
375        #[derive(Clone)]
376        struct MockState;
377
378        impl StateLogData for MockState {
379            fn get_state_data(&self) -> StateData {
380                StateData {
381                    uuid: uuid::Uuid::now_v7(),
382                    ip: IpAddr::V4([127, 0, 0, 1].into()),
383                    target: "test",
384                    worker_id: 1,
385                    worker_route: None,
386                }
387            }
388        }
389
390        struct Svc;
391
392        impl Service<Request<()>> for Svc {
393            type Response = Response<()>;
394            type Error = ();
395            async fn call(
396                &self,
397                _: Request<()>,
398                _: Option<IpAddr>,
399            ) -> Result<Self::Response, Self::Error> {
400                tokio::time::sleep(std::time::Duration::from_millis(5)).await;
401                Ok(Response::builder().status(StatusCode::OK).body(()).unwrap())
402            }
403        }
404
405        let req = Request::builder()
406            .method(Method::GET)
407            .uri("https://example.com/")
408            .extension(MockState)
409            .version(Version::HTTP_11)
410            .header(hyper::header::USER_AGENT, "test")
411            .body(())
412            .unwrap();
413
414        let (_, log_data) = capture_log_data::<_, _, _, MockState>(&Svc, req)
415            .await
416            .unwrap();
417
418        assert_eq!(log_data.state_data.ip, IpAddr::V4([127, 0, 0, 1].into()));
419        assert_eq!(log_data.method, Method::GET);
420        assert_eq!(log_data.path, "https://example.com/");
421        assert_eq!(log_data.version, Version::HTTP_11);
422        assert_eq!(log_data.status, 200);
423        assert_eq!(
424            log_data.user_agent,
425            LogOption::Some(HeaderValue::from_static("test"))
426        );
427        assert!(log_data.elapsed > 0);
428        assert_eq!(log_data.state_data.target, "test");
429    }
430
431    #[test]
432    fn log_option_display() {
433        assert_eq!(LogOption::<u8>::None.to_string(), "-");
434        assert_eq!(LogOption::Some(1).to_string(), "1");
435    }
436
437    #[test]
438    fn log_option_debug() {
439        assert_eq!(format!("{:?}", LogOption::<u8>::None), r#""-""#);
440        assert_eq!(format!("{:?}", LogOption::Some(1)), "1");
441    }
442
443    #[test]
444    fn log_option_from_option() {
445        assert_eq!(LogOption::<u8>::from(None), LogOption::None);
446        assert_eq!(LogOption::from(Some(1)), LogOption::Some(1));
447    }
448
449    #[test]
450    fn log_data_log() {
451        use std::io::Write;
452        use std::sync::{Arc, Mutex};
453
454        struct Buffer(Arc<Mutex<Vec<u8>>>);
455
456        impl Write for Buffer {
457            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
458                self.0.lock().unwrap().write(buf)
459            }
460            fn flush(&mut self) -> std::io::Result<()> {
461                self.0.lock().unwrap().flush()
462            }
463        }
464
465        impl Buffer {
466            fn clone_buf(&self) -> Vec<u8> {
467                self.0.lock().unwrap().clone()
468            }
469        }
470
471        impl Clone for Buffer {
472            fn clone(&self) -> Self {
473                Buffer(Arc::clone(&self.0))
474            }
475        }
476
477        let log_data = HttpLogData {
478            state_data: StateData {
479                uuid: uuid::Uuid::now_v7(),
480                target: "test",
481                ip: IpAddr::V4([127, 0, 0, 1].into()),
482                worker_route: None,
483                worker_id: 1,
484            },
485            method: Method::GET,
486            path: "https://example.com/".parse().unwrap(),
487            version: Version::HTTP_11,
488            status: 200,
489            user_agent: LogOption::Some(HeaderValue::from_static("test")),
490            elapsed: 5,
491        };
492
493        let buf = Buffer(Arc::new(Mutex::new(Vec::new())));
494        let mut logger = env_logger::Builder::new();
495        // ALWAYS USE INFO LEVEL FOR LOGGING
496        logger.filter_level(log::LevelFilter::Info);
497        logger.format(|f, record| writeln!(f, "{}", record.args()));
498        logger.target(env_logger::Target::Pipe(Box::new(buf.clone())));
499        logger.init();
500
501        log_data.log();
502
503        let log = String::from_utf8(buf.clone_buf()).unwrap();
504
505        assert_eq!(
506            log.trim(),
507            r#"127.0.0.1 "GET https://example.com/ HTTP/1.1" 200 "test" 5"#
508        )
509    }
510}