use rayon::prelude::*;
#[cfg(feature = "tracing")]
use tracing::{
field::{Field, Visit},
span,
span::{Attributes, Record},
Event, Id, Level, Subscriber,
};
#[cfg(feature = "tracing")]
use tracing_subscriber::{
layer::{Context, Layer, SubscriberExt},
registry::LookupSpan,
util::SubscriberInitExt,
};
#[cfg(feature = "tracing")]
use std::{
collections::{HashMap, HashSet},
io::Write,
sync::Arc,
};
use std::{fs::File, io::Read, path::Path, sync::Mutex};
#[derive(Default)]
#[cfg(feature = "tracing")]
struct FieldVisitor {
fields: Vec<(&'static str, String)>,
}
#[cfg(feature = "tracing")]
impl Visit for FieldVisitor {
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
self.fields.push((field.name(), format!("{:?}", value)));
}
}
#[cfg(feature = "tracing")]
struct SpanData {
name: String,
fields: Vec<(&'static str, String)>,
}
#[cfg(feature = "tracing")]
struct PruningLayer {
spans: Mutex<HashMap<Id, SpanData>>,
relevant_spans: Mutex<HashSet<Id>>,
file: Arc<Mutex<Box<dyn Write + Send>>>,
}
#[cfg(feature = "tracing")]
struct PruningGuard {
file: Arc<Mutex<Box<dyn Write + Send>>>,
}
#[cfg(feature = "tracing")]
impl Drop for PruningGuard {
fn drop(&mut self) {
if let Ok(mut file) = self.file.lock() {
let _ = file.flush();
}
}
}
#[cfg(feature = "tracing")]
impl PruningLayer {
fn new(path: Option<&str>) -> (Self, PruningGuard) {
let writer = match path {
Some(p) => Box::new(File::create(p).unwrap()) as Box<dyn Write + Send>,
None => Box::new(std::io::stdout()) as Box<dyn Write + Send>,
};
let file = Arc::new(Mutex::new(writer));
let layer = Self {
spans: Mutex::new(HashMap::new()),
relevant_spans: Mutex::new(HashSet::new()),
file: file.clone(),
};
let guard = PruningGuard { file };
(layer, guard)
}
}
#[cfg(feature = "tracing")]
impl<S> Layer<S> for PruningLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
let mut visitor = FieldVisitor::default();
attrs.record(&mut visitor);
let mut spans = self.spans.lock().unwrap();
spans.insert(
id.clone(),
SpanData {
name: attrs.metadata().name().to_string(),
fields: visitor.fields,
},
);
}
fn on_record(&self, id: &Id, values: &Record<'_>, _ctx: Context<'_, S>) {
let mut visitor = FieldVisitor::default();
values.record(&mut visitor);
let mut spans = self.spans.lock().unwrap();
if let Some(span) = spans.get_mut(id) {
span.fields.extend(visitor.fields);
}
}
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
let mut visitor = FieldVisitor::default();
event.record(&mut visitor);
let chain: Vec<_> = match ctx.event_scope(event) {
Some(scope) => scope.from_root().map(|s| s.id()).collect(),
None => Vec::new(),
};
{
let mut relevant_spans = self.relevant_spans.lock().unwrap();
relevant_spans.extend(chain.iter().cloned());
}
let mut file = self.file.lock().unwrap();
let spans = self.spans.lock().unwrap();
let mut e_spans = Vec::new();
for span_id in &chain {
let span = spans.get(span_id).unwrap();
e_spans.push(LogSpan {
span: span.name.clone(),
meta: span.fields.clone().into_iter().collect(),
})
}
serde_json::to_writer(
&mut *file,
&LogEvent {
trace: e_spans,
event: event.metadata().name().to_string(),
meta: visitor.fields.into_iter().collect(),
},
)
.unwrap();
file.write_all(b"\n").unwrap();
}
fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
let mut spans = self.spans.lock().unwrap();
let relevant_spans = self.relevant_spans.lock().unwrap();
if !relevant_spans.contains(&id) {
spans.remove(&id);
}
}
}
#[derive(serde::Serialize)]
#[cfg(feature = "tracing")]
struct LogEvent {
event: String,
#[serde(flatten)]
meta: HashMap<&'static str, String>,
trace: Vec<LogSpan>,
}
#[derive(serde::Serialize)]
#[cfg(feature = "tracing")]
struct LogSpan {
span: String,
#[serde(flatten)]
meta: HashMap<&'static str, String>,
}
fn parse_mbox(input: &[u8]) -> Vec<&[u8]> {
let mut res = Vec::new();
let mut start = 0usize;
let mut pos = 0usize;
for line in input.split(|b| *b == b'\n') {
let line_len = line.len() + 1;
if line.starts_with(b"From ") {
if start < pos {
res.push(&input[start..pos]);
}
start = pos + line_len;
}
pos += line_len;
}
if start < pos {
res.push(&input[start..std::cmp::min(pos, input.len())]);
}
res
}
fn dir_entries(path: &std::path::Path, v: &mut Vec<std::path::PathBuf>) -> std::io::Result<()> {
if path.is_dir() {
for entry in std::fs::read_dir(path)? {
let entry: std::fs::DirEntry = entry?;
let path = entry.path();
if path.is_dir() {
dir_entries(&path, v)?;
} else {
v.push(path);
}
}
}
Ok(())
}
fn extension_is(path: &str, ext: &str) -> bool {
Path::new(path)
.extension()
.is_some_and(|path_ext| path_ext.eq_ignore_ascii_case(ext))
}
fn main() {
#[cfg(feature = "tracing")]
let (layer, _guard) = PruningLayer::new(None);
#[cfg(feature = "tracing")]
tracing_subscriber::registry().with(layer).init();
for path in std::env::args().skip(1) {
let attr =
std::fs::metadata(&path).unwrap_or_else(|err| panic!("error reading {}\n{err}", path));
if attr.is_dir() {
let mut entries = Vec::new();
dir_entries(Path::new(&path), &mut entries)
.unwrap_or_else(|err| panic!("failed listing files in {}.\n{err}", path));
entries.par_iter().for_each(|path| {
#[cfg(feature = "tracing")]
let _span = span!(Level::TRACE, "file email", path = %path.display()).entered();
eprintln!("parsing email {}", path.display());
let mut input = Vec::new();
File::open(path).unwrap().read_to_end(&mut input).unwrap();
let _eml = eml_codec::parse_message(&input);
});
} else if extension_is(&path, "mbox") {
#[cfg(feature = "tracing")]
let _span = span!(Level::TRACE, "mailbox", %path).entered();
eprintln!("parsing mailbox: {}", path);
let mut input = Vec::new();
File::open(&path).unwrap().read_to_end(&mut input).unwrap();
let raw_emails = parse_mbox(&input);
eprintln!("{} emails found", raw_emails.len());
raw_emails
.par_iter()
.enumerate()
.for_each(|(idx, raw_email)| {
#[cfg(feature = "tracing")]
let _span = span!(Level::TRACE, "mailbox email", %path, idx).entered();
eprintln!("parsing mbox email {}", idx);
let _eml = eml_codec::parse_message(raw_email);
})
} else if extension_is(&path, "zip") {
#[cfg(feature = "tracing")]
let _span = span!(Level::TRACE, "zip", %path).entered();
eprintln!("parsing zip file: {}", path);
let archive = zip::ZipArchive::new(File::open(&path).unwrap()).unwrap();
let nb_items = archive.len();
let archive_lck = Mutex::new(archive);
(0..nb_items).into_par_iter().for_each(|i| {
let mut input = Vec::new();
let mut _fpath = None;
{
let mut archive = archive_lck.lock().unwrap();
let mut file = archive.by_index(i).unwrap();
eprintln!("parsing email {}", file.name());
_fpath = Some(file.name().to_string());
file.read_to_end(&mut input).unwrap();
}
#[cfg(feature = "tracing")]
let _span =
span!(Level::TRACE, "zip email", %path, fpath = %_fpath.unwrap()).entered();
let _eml = eml_codec::parse_message(&input);
})
} else if extension_is(&path, "tar") {
#[cfg(feature = "tracing")]
let _span = span!(Level::TRACE, "tar", %path).entered();
eprintln!("parsing tar file: {}", path);
let mut archive = tar::Archive::new(File::open(&path).unwrap());
for ent in archive.entries_with_seek().unwrap() {
let mut input = Vec::new();
let mut ent = ent.unwrap();
let fpath = ent.path().unwrap().into_owned();
let _ = ent.read_to_end(&mut input).unwrap();
eprintln!("parsing email {}", fpath.display());
#[cfg(feature = "tracing")]
let _span =
span!(Level::TRACE, "tar email", %path, fpath = %fpath.display()).entered();
let _eml = eml_codec::parse_message(&input);
}
} else {
#[cfg(feature = "tracing")]
let _span = span!(Level::TRACE, "eml", %path).entered();
eprintln!("parsing single email: {}", path);
let mut input = Vec::new();
File::open(&path).unwrap().read_to_end(&mut input).unwrap();
let _eml = eml_codec::parse_message(&input);
}
}
}