rd_util/
journal_tailer.rs1use anyhow::Result;
3use crossbeam::channel::{self, select, Receiver, Sender};
4use log::{debug, error, warn};
5use std::collections::VecDeque;
6use std::os::unix::ffi::OsStrExt;
7use std::process;
8use std::sync::{Arc, Mutex};
9use std::thread::{spawn, JoinHandle};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use super::child_reader_thread;
13
14#[derive(Debug)]
15pub struct JournalMsg {
16 pub seq: u64,
17 pub at: SystemTime,
18 pub priority: u32,
19 pub unit: String,
20 pub msg: String,
21}
22
23type JournalNotifyFn = Box<dyn FnMut(&VecDeque<JournalMsg>, bool) + Send>;
24
25fn parse_journal_msg(line: &str, seq: u64) -> Result<JournalMsg> {
26 let parsed = json::parse(line)?;
27 let at_us: u64 = parsed["__REALTIME_TIMESTAMP"]
28 .as_str()
29 .unwrap_or("0")
30 .parse()
31 .unwrap_or(0);
32 let priority: u32 = parsed["PRIORITY"]
33 .as_str()
34 .unwrap_or("0")
35 .parse()
36 .unwrap_or(0);
37 let unit = parsed["_SYSTEMD_UNIT"].as_str().unwrap_or("UNKNOWN");
38
39 let msg = match &parsed["MESSAGE"] {
40 json::JsonValue::Short(v) => v.to_string(),
41 json::JsonValue::String(v) => v.to_string(),
42 json::JsonValue::Array(ar) => {
43 let u8_ar: Vec<u8> = ar.iter().map(|x| x.as_u8().unwrap_or('?' as u8)).collect();
44 std::ffi::OsStr::from_bytes(&u8_ar).to_string_lossy().into()
45 }
46 v => format!("<UNKNOWN> {:?}", &v),
47 };
48
49 Ok(JournalMsg {
50 seq,
51 at: UNIX_EPOCH + Duration::from_micros(at_us),
52 priority,
53 unit: unit.to_string(),
54 msg: msg.to_string(),
55 })
56}
57
58struct JournalTailWorker {
59 retention: usize,
60 notify: JournalNotifyFn,
61 msgs: Arc<Mutex<VecDeque<JournalMsg>>>,
62 term_rx: Receiver<()>,
63 seq_cursor: u64,
64}
65
66impl JournalTailWorker {
67 fn new(
68 retention: usize,
69 notify: JournalNotifyFn,
70 msgs: Arc<Mutex<VecDeque<JournalMsg>>>,
71 term_rx: Receiver<()>,
72 ) -> Self {
73 Self {
74 retention,
75 notify,
76 msgs,
77 term_rx,
78 seq_cursor: 1,
79 }
80 }
81
82 fn process(&mut self, line: String, flush: bool) {
83 let msg = match parse_journal_msg(&line, self.seq_cursor) {
84 Ok(v) => v,
85 Err(e) => {
86 error!(
87 "journal: Failed to parse journal output {:?} ({:?})",
88 &line, &e
89 );
90 return;
91 }
92 };
93 self.seq_cursor += 1;
94 let mut msgs = self.msgs.lock().unwrap();
95 msgs.push_front(msg);
96 (self.notify)(&*msgs, flush);
97 msgs.truncate(self.retention);
98 }
99
100 fn run(mut self, mut jctl_cmd: process::Command) {
101 let mut jctl = jctl_cmd.spawn().unwrap();
102 let jctl_stdout = jctl.stdout.take().unwrap();
103 let (line_tx, line_rx) = channel::unbounded::<String>();
104 let jh = spawn(move || child_reader_thread("journal".into(), jctl_stdout, line_tx));
105
106 loop {
107 select! {
108 recv(line_rx) -> res => {
109 match res {
110 Ok(line) => self.process(line, line_rx.is_empty()),
111 Err(e) => {
112 warn!("journal: reader thread failed ({:?})", &e);
113 break;
114 }
115 }
116 },
117 recv(self.term_rx) -> term => {
118 if let Err(e) = term {
119 debug!("journal: Term ({})", &e);
120 break;
121 }
122 },
123 };
124 }
125
126 drop(line_rx);
127 let _ = jctl.kill();
128 let _ = jctl.wait();
129 jh.join().unwrap();
130 }
131}
132
133pub struct JournalTailer {
134 pub msgs: Arc<Mutex<VecDeque<JournalMsg>>>,
135 term_tx: Option<Sender<()>>,
136 jh: Option<JoinHandle<()>>,
137}
138
139impl JournalTailer {
140 pub fn new(units: &[&str], retention: usize, notify: JournalNotifyFn) -> Self {
141 let msgs = Arc::new(Mutex::new(VecDeque::<JournalMsg>::new()));
142 let (term_tx, term_rx) = channel::unbounded::<()>();
143 let worker = JournalTailWorker::new(retention, notify, msgs.clone(), term_rx);
144
145 let mut cmd = process::Command::new("journalctl");
146 cmd.args(&["-o", "json", "-f", "-n", &format!("{}", retention)]);
147 for unit in units.iter() {
148 cmd.args(&["-u", unit]);
149 }
150 cmd.stdout(process::Stdio::piped());
151
152 let jh = spawn(move || worker.run(cmd));
153
154 Self {
155 msgs,
156 term_tx: Some(term_tx),
157 jh: Some(jh),
158 }
159 }
160}
161
162impl Drop for JournalTailer {
163 fn drop(&mut self) {
164 drop(self.term_tx.take().unwrap());
165 let _ = self.jh.take().unwrap().join();
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::JournalTailer;
172 use log::info;
173 use std::thread::sleep;
174 use std::time::Duration;
175
176 #[test]
177 fn test() {
178 let _ = ::env_logger::try_init();
179 let s = JournalTailer::new(
180 &vec!["rd-hashd-A.service", "rd-sideload.service"],
181 10,
182 Box::new(|msg, flush| info!("notified {:?} flush={:?}", msg, flush)),
183 );
184 sleep(Duration::from_secs(10));
185 drop(s);
186 }
187}