eml-codec 0.4.0

Email enCOder DECoder in Rust. Support Internet Message Format and MIME (RFC 822, 5322, 2045, 2046, 2047, 2048, 2049, 6532).
Documentation
use rayon::prelude::*;

#[cfg(feature = "tracing")]
use tracing::{
    field::{Field, Visit},
    span,
    span::{Attributes, Record},
    Event, Id, Level, Subscriber,
};
#[cfg(feature = "tracing")]
use tracing_subscriber::{
    layer::{Context, Layer, SubscriberExt},
    registry::LookupSpan,
    util::SubscriberInitExt,
};

#[cfg(feature = "tracing")]
use std::{
    collections::{HashMap, HashSet},
    io::Write,
    sync::Arc,
};
use std::{fs::File, io::Read, path::Path, sync::Mutex};

// This small tool implements:
// - a collector for the `tracing` events and spans emitted from the parser,
// performing some form of "tail sampling": only keep events and the spans that
// enclose events, removing spans that do not contain any event.
// - output of the filtered events as json records (one output line = one event)
// - parallel parsing of collections of emails (from directories, mbox files,
// etc), to allow tracing events on large email corpuses.

#[derive(Default)]
#[cfg(feature = "tracing")]
struct FieldVisitor {
    fields: Vec<(&'static str, String)>,
}

#[cfg(feature = "tracing")]
impl Visit for FieldVisitor {
    fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
        self.fields.push((field.name(), format!("{:?}", value)));
    }
}

#[cfg(feature = "tracing")]
struct SpanData {
    name: String,
    fields: Vec<(&'static str, String)>,
}

// The "pruning layer" performs tail sampling, filtering out spans that do not
// contain any event.
#[cfg(feature = "tracing")]
struct PruningLayer {
    spans: Mutex<HashMap<Id, SpanData>>,
    relevant_spans: Mutex<HashSet<Id>>,
    file: Arc<Mutex<Box<dyn Write + Send>>>,
}

#[cfg(feature = "tracing")]
struct PruningGuard {
    file: Arc<Mutex<Box<dyn Write + Send>>>,
}

#[cfg(feature = "tracing")]
impl Drop for PruningGuard {
    fn drop(&mut self) {
        if let Ok(mut file) = self.file.lock() {
            let _ = file.flush();
        }
    }
}

#[cfg(feature = "tracing")]
impl PruningLayer {
    fn new(path: Option<&str>) -> (Self, PruningGuard) {
        let writer = match path {
            Some(p) => Box::new(File::create(p).unwrap()) as Box<dyn Write + Send>,
            None => Box::new(std::io::stdout()) as Box<dyn Write + Send>,
        };
        let file = Arc::new(Mutex::new(writer));
        let layer = Self {
            spans: Mutex::new(HashMap::new()),
            relevant_spans: Mutex::new(HashSet::new()),
            file: file.clone(),
        };
        let guard = PruningGuard { file };
        (layer, guard)
    }
}

#[cfg(feature = "tracing")]
impl<S> Layer<S> for PruningLayer
where
    S: Subscriber + for<'a> LookupSpan<'a>,
{
    fn on_new_span(&self, attrs: &Attributes<'_>, id: &Id, _ctx: Context<'_, S>) {
        let mut visitor = FieldVisitor::default();
        attrs.record(&mut visitor);
        let mut spans = self.spans.lock().unwrap();
        spans.insert(
            id.clone(),
            SpanData {
                name: attrs.metadata().name().to_string(),
                fields: visitor.fields,
            },
        );
    }

    fn on_record(&self, id: &Id, values: &Record<'_>, _ctx: Context<'_, S>) {
        let mut visitor = FieldVisitor::default();
        values.record(&mut visitor);

        let mut spans = self.spans.lock().unwrap();
        if let Some(span) = spans.get_mut(id) {
            span.fields.extend(visitor.fields);
        }
    }

    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
        let mut visitor = FieldVisitor::default();
        event.record(&mut visitor);

        // collect event ancestors
        let chain: Vec<_> = match ctx.event_scope(event) {
            Some(scope) => scope.from_root().map(|s| s.id()).collect(),
            None => Vec::new(),
        };

        // mark ancestor spans as used
        {
            let mut relevant_spans = self.relevant_spans.lock().unwrap();
            relevant_spans.extend(chain.iter().cloned());
        }

        let mut file = self.file.lock().unwrap();
        let spans = self.spans.lock().unwrap();

        let mut e_spans = Vec::new();
        for span_id in &chain {
            let span = spans.get(span_id).unwrap();
            e_spans.push(LogSpan {
                span: span.name.clone(),
                meta: span.fields.clone().into_iter().collect(),
            })
        }
        serde_json::to_writer(
            &mut *file,
            &LogEvent {
                trace: e_spans,
                event: event.metadata().name().to_string(),
                meta: visitor.fields.into_iter().collect(),
            },
        )
        .unwrap();
        file.write_all(b"\n").unwrap();
    }

    fn on_close(&self, id: Id, _ctx: Context<'_, S>) {
        let mut spans = self.spans.lock().unwrap();
        let relevant_spans = self.relevant_spans.lock().unwrap();
        // discard span if it contains no events
        if !relevant_spans.contains(&id) {
            spans.remove(&id);
        }
    }
}

// Items of the output trace (list of json records, one `LogEvent` record per
// line)
#[derive(serde::Serialize)]
#[cfg(feature = "tracing")]
struct LogEvent {
    event: String,
    #[serde(flatten)]
    meta: HashMap<&'static str, String>,
    trace: Vec<LogSpan>,
}

#[derive(serde::Serialize)]
#[cfg(feature = "tracing")]
struct LogSpan {
    span: String,
    #[serde(flatten)]
    meta: HashMap<&'static str, String>,
}

// mbox parsing. A .mbox is the concatenation of emails, each prepended with a
// "From ..." line (which is not part of the email itself).
fn parse_mbox(input: &[u8]) -> Vec<&[u8]> {
    let mut res = Vec::new();
    let mut start = 0usize;
    let mut pos = 0usize;

    for line in input.split(|b| *b == b'\n') {
        let line_len = line.len() + 1;
        if line.starts_with(b"From ") {
            if start < pos {
                res.push(&input[start..pos]);
            }
            start = pos + line_len;
        }
        pos += line_len;
    }
    if start < pos {
        res.push(&input[start..std::cmp::min(pos, input.len())]);
    }
    res
}

fn dir_entries(path: &std::path::Path, v: &mut Vec<std::path::PathBuf>) -> std::io::Result<()> {
    if path.is_dir() {
        for entry in std::fs::read_dir(path)? {
            let entry: std::fs::DirEntry = entry?;
            let path = entry.path();
            if path.is_dir() {
                dir_entries(&path, v)?;
            } else {
                v.push(path);
            }
        }
    }
    Ok(())
}

fn extension_is(path: &str, ext: &str) -> bool {
    Path::new(path)
        .extension()
        .is_some_and(|path_ext| path_ext.eq_ignore_ascii_case(ext))
}

fn main() {
    #[cfg(feature = "tracing")]
    let (layer, _guard) = PruningLayer::new(None);
    #[cfg(feature = "tracing")]
    tracing_subscriber::registry().with(layer).init();

    for path in std::env::args().skip(1) {
        let attr =
            std::fs::metadata(&path).unwrap_or_else(|err| panic!("error reading {}\n{err}", path));
        if attr.is_dir() {
            let mut entries = Vec::new();
            dir_entries(Path::new(&path), &mut entries)
                .unwrap_or_else(|err| panic!("failed listing files in {}.\n{err}", path));
            entries.par_iter().for_each(|path| {
                #[cfg(feature = "tracing")]
                let _span = span!(Level::TRACE, "file email", path = %path.display()).entered();
                eprintln!("parsing email {}", path.display());
                let mut input = Vec::new();
                File::open(path).unwrap().read_to_end(&mut input).unwrap();
                let _eml = eml_codec::parse_message(&input);
            });
        } else if extension_is(&path, "mbox") {
            #[cfg(feature = "tracing")]
            let _span = span!(Level::TRACE, "mailbox", %path).entered();
            eprintln!("parsing mailbox: {}", path);
            let mut input = Vec::new();
            File::open(&path).unwrap().read_to_end(&mut input).unwrap();
            let raw_emails = parse_mbox(&input);
            eprintln!("{} emails found", raw_emails.len());

            raw_emails
                .par_iter()
                .enumerate()
                .for_each(|(idx, raw_email)| {
                    #[cfg(feature = "tracing")]
                    let _span = span!(Level::TRACE, "mailbox email", %path, idx).entered();
                    eprintln!("parsing mbox email {}", idx);
                    let _eml = eml_codec::parse_message(raw_email);
                })
        } else if extension_is(&path, "zip") {
            #[cfg(feature = "tracing")]
            let _span = span!(Level::TRACE, "zip", %path).entered();
            eprintln!("parsing zip file: {}", path);
            let archive = zip::ZipArchive::new(File::open(&path).unwrap()).unwrap();
            let nb_items = archive.len();
            let archive_lck = Mutex::new(archive);
            (0..nb_items).into_par_iter().for_each(|i| {
                let mut input = Vec::new();
                let mut _fpath = None;
                {
                    let mut archive = archive_lck.lock().unwrap();
                    let mut file = archive.by_index(i).unwrap();
                    eprintln!("parsing email {}", file.name());
                    _fpath = Some(file.name().to_string());
                    file.read_to_end(&mut input).unwrap();
                }
                #[cfg(feature = "tracing")]
                let _span =
                    span!(Level::TRACE, "zip email", %path, fpath = %_fpath.unwrap()).entered();
                let _eml = eml_codec::parse_message(&input);
            })
        } else if extension_is(&path, "tar") {
            #[cfg(feature = "tracing")]
            let _span = span!(Level::TRACE, "tar", %path).entered();
            eprintln!("parsing tar file: {}", path);
            let mut archive = tar::Archive::new(File::open(&path).unwrap());
            for ent in archive.entries_with_seek().unwrap() {
                let mut input = Vec::new();
                let mut ent = ent.unwrap();
                let fpath = ent.path().unwrap().into_owned();
                let _ = ent.read_to_end(&mut input).unwrap();
                eprintln!("parsing email {}", fpath.display());
                #[cfg(feature = "tracing")]
                let _span =
                    span!(Level::TRACE, "tar email", %path, fpath = %fpath.display()).entered();
                let _eml = eml_codec::parse_message(&input);
            }
        } else {
            #[cfg(feature = "tracing")]
            let _span = span!(Level::TRACE, "eml", %path).entered();
            eprintln!("parsing single email: {}", path);
            let mut input = Vec::new();
            File::open(&path).unwrap().read_to_end(&mut input).unwrap();
            let _eml = eml_codec::parse_message(&input);
        }
    }
}