use anyhow::Result;
use crossbeam::channel::{self, select, Receiver, Sender};
use log::{debug, error, warn};
use std::collections::VecDeque;
use std::os::unix::ffi::OsStrExt;
use std::process;
use std::sync::{Arc, Mutex};
use std::thread::{spawn, JoinHandle};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use super::child_reader_thread;
#[derive(Debug)]
pub struct JournalMsg {
pub seq: u64,
pub at: SystemTime,
pub priority: u32,
pub unit: String,
pub msg: String,
}
type JournalNotifyFn = Box<dyn FnMut(&VecDeque<JournalMsg>, bool) + Send>;
fn parse_journal_msg(line: &str, seq: u64) -> Result<JournalMsg> {
let parsed = json::parse(line)?;
let at_us: u64 = parsed["__REALTIME_TIMESTAMP"]
.as_str()
.unwrap_or("0")
.parse()
.unwrap_or(0);
let priority: u32 = parsed["PRIORITY"]
.as_str()
.unwrap_or("0")
.parse()
.unwrap_or(0);
let unit = parsed["_SYSTEMD_UNIT"].as_str().unwrap_or("UNKNOWN");
let msg = match &parsed["MESSAGE"] {
json::JsonValue::Short(v) => v.to_string(),
json::JsonValue::String(v) => v.to_string(),
json::JsonValue::Array(ar) => {
let u8_ar: Vec<u8> = ar.iter().map(|x| x.as_u8().unwrap_or('?' as u8)).collect();
std::ffi::OsStr::from_bytes(&u8_ar).to_string_lossy().into()
}
v => format!("<UNKNOWN> {:?}", &v),
};
Ok(JournalMsg {
seq,
at: UNIX_EPOCH + Duration::from_micros(at_us),
priority,
unit: unit.to_string(),
msg: msg.to_string(),
})
}
struct JournalTailWorker {
retention: usize,
notify: JournalNotifyFn,
msgs: Arc<Mutex<VecDeque<JournalMsg>>>,
term_rx: Receiver<()>,
seq_cursor: u64,
}
impl JournalTailWorker {
fn new(
retention: usize,
notify: JournalNotifyFn,
msgs: Arc<Mutex<VecDeque<JournalMsg>>>,
term_rx: Receiver<()>,
) -> Self {
Self {
retention,
notify,
msgs,
term_rx,
seq_cursor: 1,
}
}
fn process(&mut self, line: String, flush: bool) {
let msg = match parse_journal_msg(&line, self.seq_cursor) {
Ok(v) => v,
Err(e) => {
error!(
"journal: Failed to parse journal output {:?} ({:?})",
&line, &e
);
return;
}
};
self.seq_cursor += 1;
let mut msgs = self.msgs.lock().unwrap();
msgs.push_front(msg);
(self.notify)(&*msgs, flush);
msgs.truncate(self.retention);
}
fn run(mut self, mut jctl_cmd: process::Command) {
let mut jctl = jctl_cmd.spawn().unwrap();
let jctl_stdout = jctl.stdout.take().unwrap();
let (line_tx, line_rx) = channel::unbounded::<String>();
let jh = spawn(move || child_reader_thread("journal".into(), jctl_stdout, line_tx));
loop {
select! {
recv(line_rx) -> res => {
match res {
Ok(line) => self.process(line, line_rx.is_empty()),
Err(e) => {
warn!("journal: reader thread failed ({:?})", &e);
break;
}
}
},
recv(self.term_rx) -> term => {
if let Err(e) = term {
debug!("journal: Term ({})", &e);
break;
}
},
};
}
drop(line_rx);
let _ = jctl.kill();
let _ = jctl.wait();
jh.join().unwrap();
}
}
pub struct JournalTailer {
pub msgs: Arc<Mutex<VecDeque<JournalMsg>>>,
term_tx: Option<Sender<()>>,
jh: Option<JoinHandle<()>>,
}
impl JournalTailer {
pub fn new(units: &[&str], retention: usize, notify: JournalNotifyFn) -> Self {
let msgs = Arc::new(Mutex::new(VecDeque::<JournalMsg>::new()));
let (term_tx, term_rx) = channel::unbounded::<()>();
let worker = JournalTailWorker::new(retention, notify, msgs.clone(), term_rx);
let mut cmd = process::Command::new("journalctl");
cmd.args(&["-o", "json", "-f", "-n", &format!("{}", retention)]);
for unit in units.iter() {
cmd.args(&["-u", unit]);
}
cmd.stdout(process::Stdio::piped());
let jh = spawn(move || worker.run(cmd));
Self {
msgs,
term_tx: Some(term_tx),
jh: Some(jh),
}
}
}
impl Drop for JournalTailer {
fn drop(&mut self) {
drop(self.term_tx.take().unwrap());
let _ = self.jh.take().unwrap().join();
}
}
#[cfg(test)]
mod tests {
use super::JournalTailer;
use log::info;
use std::thread::sleep;
use std::time::Duration;
#[test]
fn test() {
let _ = ::env_logger::try_init();
let s = JournalTailer::new(
&vec!["rd-hashd-A.service", "rd-sideload.service"],
10,
Box::new(|msg, flush| info!("notified {:?} flush={:?}", msg, flush)),
);
sleep(Duration::from_secs(10));
drop(s);
}
}