rd_util/
journal_tailer.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2use anyhow::Result;
3use crossbeam::channel::{self, select, Receiver, Sender};
4use log::{debug, error, warn};
5use std::collections::VecDeque;
6use std::os::unix::ffi::OsStrExt;
7use std::process;
8use std::sync::{Arc, Mutex};
9use std::thread::{spawn, JoinHandle};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use super::child_reader_thread;
13
14#[derive(Debug)]
15pub struct JournalMsg {
16    pub seq: u64,
17    pub at: SystemTime,
18    pub priority: u32,
19    pub unit: String,
20    pub msg: String,
21}
22
23type JournalNotifyFn = Box<dyn FnMut(&VecDeque<JournalMsg>, bool) + Send>;
24
25fn parse_journal_msg(line: &str, seq: u64) -> Result<JournalMsg> {
26    let parsed = json::parse(line)?;
27    let at_us: u64 = parsed["__REALTIME_TIMESTAMP"]
28        .as_str()
29        .unwrap_or("0")
30        .parse()
31        .unwrap_or(0);
32    let priority: u32 = parsed["PRIORITY"]
33        .as_str()
34        .unwrap_or("0")
35        .parse()
36        .unwrap_or(0);
37    let unit = parsed["_SYSTEMD_UNIT"].as_str().unwrap_or("UNKNOWN");
38
39    let msg = match &parsed["MESSAGE"] {
40        json::JsonValue::Short(v) => v.to_string(),
41        json::JsonValue::String(v) => v.to_string(),
42        json::JsonValue::Array(ar) => {
43            let u8_ar: Vec<u8> = ar.iter().map(|x| x.as_u8().unwrap_or('?' as u8)).collect();
44            std::ffi::OsStr::from_bytes(&u8_ar).to_string_lossy().into()
45        }
46        v => format!("<UNKNOWN> {:?}", &v),
47    };
48
49    Ok(JournalMsg {
50        seq,
51        at: UNIX_EPOCH + Duration::from_micros(at_us),
52        priority,
53        unit: unit.to_string(),
54        msg: msg.to_string(),
55    })
56}
57
58struct JournalTailWorker {
59    retention: usize,
60    notify: JournalNotifyFn,
61    msgs: Arc<Mutex<VecDeque<JournalMsg>>>,
62    term_rx: Receiver<()>,
63    seq_cursor: u64,
64}
65
66impl JournalTailWorker {
67    fn new(
68        retention: usize,
69        notify: JournalNotifyFn,
70        msgs: Arc<Mutex<VecDeque<JournalMsg>>>,
71        term_rx: Receiver<()>,
72    ) -> Self {
73        Self {
74            retention,
75            notify,
76            msgs,
77            term_rx,
78            seq_cursor: 1,
79        }
80    }
81
82    fn process(&mut self, line: String, flush: bool) {
83        let msg = match parse_journal_msg(&line, self.seq_cursor) {
84            Ok(v) => v,
85            Err(e) => {
86                error!(
87                    "journal: Failed to parse journal output {:?} ({:?})",
88                    &line, &e
89                );
90                return;
91            }
92        };
93        self.seq_cursor += 1;
94        let mut msgs = self.msgs.lock().unwrap();
95        msgs.push_front(msg);
96        (self.notify)(&*msgs, flush);
97        msgs.truncate(self.retention);
98    }
99
100    fn run(mut self, mut jctl_cmd: process::Command) {
101        let mut jctl = jctl_cmd.spawn().unwrap();
102        let jctl_stdout = jctl.stdout.take().unwrap();
103        let (line_tx, line_rx) = channel::unbounded::<String>();
104        let jh = spawn(move || child_reader_thread("journal".into(), jctl_stdout, line_tx));
105
106        loop {
107            select! {
108                recv(line_rx) -> res => {
109                    match res {
110                        Ok(line) => self.process(line, line_rx.is_empty()),
111                        Err(e) => {
112                            warn!("journal: reader thread failed ({:?})", &e);
113                            break;
114                        }
115                    }
116                },
117                recv(self.term_rx) -> term => {
118                    if let Err(e) = term {
119                        debug!("journal: Term ({})", &e);
120                        break;
121                    }
122                },
123            };
124        }
125
126        drop(line_rx);
127        let _ = jctl.kill();
128        let _ = jctl.wait();
129        jh.join().unwrap();
130    }
131}
132
133pub struct JournalTailer {
134    pub msgs: Arc<Mutex<VecDeque<JournalMsg>>>,
135    term_tx: Option<Sender<()>>,
136    jh: Option<JoinHandle<()>>,
137}
138
139impl JournalTailer {
140    pub fn new(units: &[&str], retention: usize, notify: JournalNotifyFn) -> Self {
141        let msgs = Arc::new(Mutex::new(VecDeque::<JournalMsg>::new()));
142        let (term_tx, term_rx) = channel::unbounded::<()>();
143        let worker = JournalTailWorker::new(retention, notify, msgs.clone(), term_rx);
144
145        let mut cmd = process::Command::new("journalctl");
146        cmd.args(&["-o", "json", "-f", "-n", &format!("{}", retention)]);
147        for unit in units.iter() {
148            cmd.args(&["-u", unit]);
149        }
150        cmd.stdout(process::Stdio::piped());
151
152        let jh = spawn(move || worker.run(cmd));
153
154        Self {
155            msgs,
156            term_tx: Some(term_tx),
157            jh: Some(jh),
158        }
159    }
160}
161
162impl Drop for JournalTailer {
163    fn drop(&mut self) {
164        drop(self.term_tx.take().unwrap());
165        let _ = self.jh.take().unwrap().join();
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::JournalTailer;
172    use log::info;
173    use std::thread::sleep;
174    use std::time::Duration;
175
176    #[test]
177    fn test() {
178        let _ = ::env_logger::try_init();
179        let s = JournalTailer::new(
180            &vec!["rd-hashd-A.service", "rd-sideload.service"],
181            10,
182            Box::new(|msg, flush| info!("notified {:?} flush={:?}", msg, flush)),
183        );
184        sleep(Duration::from_secs(10));
185        drop(s);
186    }
187}