use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use notify::Watcher;
use rayon::{prelude::*, spawn};
use std::thread::sleep;
use std::time::Duration;
use std::{
io::{BufRead, Read, Seek},
path::Path,
process::Stdio,
sync::mpsc,
};
use crate::parser;
use crate::settings::RulesSettings;
use crate::{ast::AST, events::TuiEvent, parser::Parser, record::Record};
#[derive(Debug, Default)]
pub struct RecordList {
pub all_records: Vec<Record>,
pub visible_records: Vec<Record>,
pub parsers: Vec<Parser>,
pub filter: Option<AST>,
pub child_process: Option<u32>,
pub max_record_size: usize,
}
impl RecordList {
pub fn new() -> RecordList {
RecordList {
all_records: Vec::new(),
visible_records: Vec::new(),
parsers: vec![],
filter: None,
child_process: None,
max_record_size: 0,
}
}
pub fn readfile_gz(&mut self, filename: &str) {
let file = match std::fs::File::open(filename) {
Ok(file) => file,
Err(_error) => panic!("Could not open file={:?}", filename),
};
let reader = std::io::BufReader::new(file);
let mut decoder = flate2::read::GzDecoder::new(reader);
let mut buffer = String::new();
decoder.read_to_string(&mut buffer).unwrap();
let lines: Vec<String> = buffer.lines().map(|line| line.to_string()).collect();
let records: Vec<Record> = lines
.par_iter()
.enumerate()
.map(|(line_number, line)| {
let mut record = Record::new(line.clone());
record.set_data("filename", filename.to_string());
record.set_data("line_number", (line_number + 1).to_string());
record.parse(&self.parsers);
record
})
.collect();
self.visible_records = records.clone();
self.all_records.extend(records);
self.renumber();
self.max_record_size = self
.visible_records
.iter()
.map(|r| r.original.len())
.max()
.unwrap_or(0);
}
pub fn readfile_parallel(&mut self, filename: &str, tx: mpsc::Sender<TuiEvent>) {
let file = match std::fs::File::open(filename) {
Ok(file) => file,
Err(_error) => panic!("Could not open file={:?}", filename),
};
let mut reader = std::io::BufReader::new(file);
let file_size = reader.seek(std::io::SeekFrom::End(0)).unwrap();
reader.seek(std::io::SeekFrom::Start(0)).unwrap();
let mut first_line = String::new();
if let Ok(size) = reader.read_line(&mut first_line) {
if size > 0 {
let mut record = Record::new(first_line);
record.set_data("filename", filename.to_string());
record.set_data("line_number", "1".to_string());
record.parse(&self.parsers);
self.all_records.push(record);
}
}
let lines: Vec<String> = reader.lines().map(|line| line.unwrap()).collect();
let records: Vec<Record> = lines
.par_iter()
.enumerate()
.map(|(line_number, line)| {
let mut record = Record::new(line.clone());
record.set_data("filename", filename.to_string());
record.set_data("line_number", (line_number + 1).to_string());
record.parse(&self.parsers);
record
})
.collect();
self.visible_records = records.clone();
self.all_records.extend(records);
self.renumber();
Self::wait_for_changes(filename.to_string(), tx, file_size.try_into().unwrap());
self.max_record_size = self
.visible_records
.iter()
.map(|r| r.original.len())
.max()
.unwrap_or(0);
}
pub fn wait_for_changes(filename: String, tx: mpsc::Sender<TuiEvent>, position: usize) {
let tx_clone = tx.clone();
spawn(move || {
let mut position = position;
let (tx, rx) = mpsc::channel();
let mut watcher = notify::recommended_watcher(tx).unwrap();
watcher
.watch(Path::new(&filename), notify::RecursiveMode::NonRecursive)
.unwrap();
loop {
match rx.recv() {
Ok(event) => match event.unwrap().kind {
notify::EventKind::Modify(_) => {
position =
Self::read_and_send_new_lines(&filename, &tx_clone, position);
}
_ => {}
},
Err(e) => println!("watch error: {:?}", e),
}
}
});
}
pub fn read_and_send_new_lines(
filename: &str,
tx: &mpsc::Sender<TuiEvent>,
position: usize,
) -> usize {
let file = std::fs::File::open(filename).expect("could not open file");
let mut reader = std::io::BufReader::new(file);
let end_position = reader.seek(std::io::SeekFrom::End(0)).unwrap();
reader
.seek(std::io::SeekFrom::Start(position as u64))
.unwrap();
let mut line_number = 1;
for line in reader.lines() {
let line = line.expect("could not read line");
let mut record = Record::new(line.clone());
record.set_data("filename", filename.to_string());
record.set_line_number(line_number);
tx.send(TuiEvent::NewRecord(record)).unwrap();
line_number += 1;
}
end_position as usize
}
pub fn readfile_stdin(&mut self, tx: mpsc::Sender<TuiEvent>) {
spawn(move || {
let reader = std::io::stdin();
let reader = reader.lock();
for line in reader.lines() {
let line = line.expect("could not read line");
let record = Record::new(line);
tx.send(TuiEvent::NewRecord(record)).unwrap();
}
});
}
pub fn add_record(
&mut self,
mut record: Record,
lua_engine: Option<&mut crate::lua_engine::LuaEngine>,
) {
record.parse(&mut self.parsers);
record.set_data("line_number", (self.all_records.len() + 1).to_string());
if let Some(engine) = lua_engine {
if let Err(e) = engine.execute_record_processors(&mut record) {
eprintln!("Error executing record processors: {}", e);
}
}
self.max_record_size = self.max_record_size.max(record.original.len());
self.all_records.push(record.clone());
if self.filter.is_none() || record.matches(&self.filter.as_ref().unwrap()) {
record.set_line_number(self.visible_records.len() + 1);
self.visible_records.push(record);
}
}
pub fn readfile_exec(&mut self, args: &[&str], tx: mpsc::Sender<TuiEvent>) {
let mut child = std::process::Command::new("setsid")
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("could not execute command");
let stdout = std::io::BufReader::new(child.stdout.take().expect("could not read stdout"));
let stderr = std::io::BufReader::new(child.stderr.take().expect("could not read stderr"));
let tx_stdout = tx.clone();
let tx_exit = tx.clone();
let tx_stderr = tx;
spawn(move || {
for line in stdout.lines() {
if let Ok(line) = line {
let mut record = Record::new(line);
record.set_data("filename", "stdout".into());
tx_stdout.send(TuiEvent::NewRecord(record)).unwrap();
} else {
return;
}
}
});
spawn(move || {
for line in stderr.lines() {
if let Ok(line) = line {
let mut record = Record::new(line);
record.set_data("filename", "stderr".into());
tx_stderr.send(TuiEvent::NewRecord(record)).unwrap();
} else {
return;
}
}
});
let child_pid = child.id();
spawn(move || {
let result = child.wait();
sleep(Duration::from_millis(100));
let mut record = Record::new(format!("EXIT: {}", result.unwrap()));
record.set_data("filename", "stderr".into());
record.set_data("mark", "white red".into());
tx_exit.send(TuiEvent::NewRecord(record)).unwrap();
});
self.child_process = Some(child_pid);
}
pub fn filter_parallel(&mut self, search: AST) {
let result: Vec<Record> = self
.all_records
.par_iter()
.filter(|record| record.matches(&search))
.map(|record| (*record).clone())
.collect();
self.filter = Some(search);
self.visible_records = result;
self.renumber();
}
pub fn search_forward(&mut self, search: &AST, start_at: usize) -> Option<usize> {
for (i, record) in self.all_records.iter().enumerate().skip(start_at) {
if record.matches(search) {
return Some(i);
}
}
None
}
pub fn search_backwards(&mut self, search: &AST, start_at: usize) -> Option<usize> {
let rstart_at = if start_at == 0 {
self.all_records.len()
} else {
start_at + 1
};
for pos in (0..rstart_at).rev() {
let record = &self.all_records[pos];
if record.matches(search) {
return Some(pos);
}
}
None
}
pub fn renumber(&mut self) {
for (i, record) in self.visible_records.iter_mut().enumerate() {
record.index = i;
}
}
pub fn reparse(&mut self) {
self.all_records.par_iter_mut().for_each(|record| {
record.parse(&self.parsers);
});
self.visible_records.par_iter_mut().for_each(|record| {
record.parse(&self.parsers);
});
self.renumber();
}
pub fn len(&self) -> usize {
return self.visible_records.len();
}
pub fn clear(&mut self) {
self.all_records.clear();
self.visible_records.clear();
}
pub fn get(&self, index: usize) -> Option<&Record> {
if index < self.visible_records.len() {
Some(&self.visible_records[index])
} else {
None
}
}
pub fn max_record_size(&self, key: &str) -> usize {
if self.max_record_size > 0 {
return self.max_record_size;
}
let mut max_size = 0;
let empty = "".to_string();
for record in &self.visible_records {
max_size = max_size.max(record.get(key).unwrap_or(&empty).len());
}
max_size
}
}
impl Drop for RecordList {
fn drop(&mut self) {
if let Some(pid) = self.child_process {
let _result = kill(Pid::from_raw(-(pid as i32)), Signal::SIGTERM);
}
}
}
pub fn load_parsers(
rule: &RulesSettings,
parsers: &mut Vec<parser::Parser>,
) -> Result<(), parser::ParserError> {
for extractor in rule.extractors.iter() {
parsers.push(parser::Parser::new(extractor)?);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::settings::RulesSettings;
#[test]
fn test_load_parsers_with_transforms() {
let rule = RulesSettings {
name: "test".to_string(),
file_patterns: vec!["*.log".to_string()],
extractors: vec![
"logfmt".to_string(),
"transform timestamp iso8601".to_string(),
"regex (?P<level>\\w+)".to_string(),
"transform timestamp rfc3339".to_string(),
"pattern <timestamp> <message>".to_string(),
],
filters: vec![],
columns: vec![],
};
let mut parsers = Vec::new();
let result = load_parsers(&rule, &mut parsers);
assert!(result.is_ok());
assert_eq!(parsers.len(), 5);
match &parsers[0] {
parser::Parser::LogFmt(_) => {} _ => panic!("Expected LogFmt parser"),
}
match &parsers[1] {
parser::Parser::TransformTimestampIso8601 => {} _ => panic!("Expected TransformTimestampIso8601 parser"),
}
match &parsers[2] {
parser::Parser::Regex(_) => {} _ => panic!("Expected Regex parser"),
}
match &parsers[3] {
parser::Parser::TransformTimestampIso8601 => {} _ => panic!("Expected TransformTimestampIso8601 parser"),
}
match &parsers[4] {
parser::Parser::Regex(_) => {} _ => panic!("Expected Regex parser from pattern"),
}
}
#[test]
fn test_load_parsers_with_invalid_parser() {
let rule = RulesSettings {
name: "test".to_string(),
file_patterns: vec!["*.log".to_string()],
extractors: vec!["logfmt".to_string(), "invalid_parser_type".to_string()],
filters: vec![],
columns: vec![],
};
let mut parsers = Vec::new();
let result = load_parsers(&rule, &mut parsers);
assert!(result.is_err());
match result.unwrap_err() {
parser::ParserError::InvalidParser(msg) => {
assert_eq!(msg, "invalid_parser_type");
}
}
}
#[test]
fn test_load_parsers_with_real_world_scenario() {
let rule = RulesSettings {
name: "nginx".to_string(),
file_patterns: vec!["nginx/*.log".to_string()],
extractors: vec![
"pattern <ip> - <user> [<timestamp>] \"<method> <url> <protocol>\" <status> <bytes> \"<referer>\" \"<user_agent>\"".to_string(),
"transform timestamp iso8601".to_string(),
],
filters: vec![],
columns: vec![],
};
let mut parsers = Vec::new();
let result = load_parsers(&rule, &mut parsers);
assert!(result.is_ok());
assert_eq!(parsers.len(), 2);
match &parsers[0] {
parser::Parser::Regex(_) => {} _ => panic!("Expected Regex parser from pattern"),
}
match &parsers[1] {
parser::Parser::TransformTimestampIso8601 => {} _ => panic!("Expected TransformTimestampIso8601 parser"),
}
}
#[test]
fn test_load_parsers_with_empty_file_scenario() {
let rule = RulesSettings {
name: "default".to_string(),
file_patterns: vec![".*".to_string()],
extractors: vec![
"logfmt".to_string(),
"regex (?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2})".to_string(),
"regex (?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}Z)"
.to_string(),
"regex (?P<level>info|warning|error|debug|warn)".to_string(),
"regex (?P<date>\\d{4}-\\d{2}-\\d{2})".to_string(),
"regex (?P<what>status|upgrade|startup)".to_string(),
"autodatetime".to_string(),
],
filters: vec![],
columns: vec![],
};
let mut parsers = Vec::new();
let result = load_parsers(&rule, &mut parsers);
assert!(result.is_ok());
assert_eq!(parsers.len(), 7);
}
}