use forge::signal::compactor;
use once_cell::sync::Lazy;
use regex::Regex;
static JVM_NOISE_RE: Lazy<Regex> = Lazy::new(|| {
Regex::new(r"(?m)^\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} (?:INFO|DEBUG|WARN NativeCodeLoader|WARN util\.NativeCodeLoader)[^\n]*\n?").unwrap()
});
static SLF4J_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)^SLF4J:[^\n]*\n?").unwrap());
static LOG4J_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)^log4j:WARN[^\n]*\n?").unwrap());
static BLOCK_MANAGER_RE: Lazy<Regex> = Lazy::new(|| {
Regex::new(r"(?m)^\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} INFO (?:BlockManager|BlockManagerInfo|MemoryStore|TaskSetManager|DAGScheduler|TaskSchedulerImpl|CoarseGrainedSchedulerBackend|SparkContext|SparkUI)[^\n]*\n?").unwrap()
});
static STAGE_PROGRESS_RE: Lazy<Regex> =
Lazy::new(|| Regex::new(r"(?m)^\[Stage \d+:[^\]]+\]\s*\r?\n?").unwrap());
pub fn compress_spark(subcmd: &str, raw: &str) -> String {
let _ = subcmd;
let cleaned = compactor::normalise(raw);
let s = SLF4J_RE.replace_all(&cleaned, "");
let s = LOG4J_RE.replace_all(&s, "");
let s = BLOCK_MANAGER_RE.replace_all(&s, "");
let s = JVM_NOISE_RE.replace_all(&s, "");
let s = STAGE_PROGRESS_RE.replace_all(&s, "");
let mut result_lines: Vec<&str> = Vec::new();
let mut error_lines: Vec<&str> = Vec::new();
for line in s.lines() {
let t = line.trim();
if t.is_empty() {
continue;
}
if t.contains("ERROR") || t.contains("Exception") || t.contains("FAILED") {
error_lines.push(line);
} else if !t.starts_with("24/")
&& !t.starts_with("23/")
&& !t.starts_with("22/")
&& !t.starts_with("INFO")
&& !t.starts_with("DEBUG")
{
result_lines.push(line);
}
}
let mut out: Vec<&str> = Vec::new();
out.extend(error_lines.iter().copied());
out.extend(result_lines.iter().copied());
if out.is_empty() {
return compactor::collapse_blanks(&s);
}
if out.len() > 50 {
return format!(
"{}\n… [{} more lines]",
out[..50].join("\n"),
out.len() - 50
);
}
out.join("\n")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn strips_jvm_and_slf4j_noise() {
let raw = "SLF4J: Class path contains multiple SLF4J bindings.\nlog4j:WARN No appenders could be found for logger.\n24/01/01 12:00:00 INFO SparkContext: Running Spark version 3.4.1\n24/01/01 12:00:00 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy\nHello from Spark job!\n24/01/01 12:00:01 INFO SparkContext: Successfully stopped SparkContext\n";
let out = compress_spark("", raw);
assert!(!out.contains("SLF4J:"), "{out}");
assert!(!out.contains("log4j:WARN"), "{out}");
assert!(!out.contains("INFO SparkContext"), "{out}");
assert!(out.contains("Hello from Spark job"), "{out}");
}
#[test]
fn keeps_error_lines() {
let raw = "SLF4J: Class path contains multiple SLF4J bindings.\n24/01/01 12:00:00 INFO SparkContext: Running Spark\nException in thread \"main\" java.lang.NullPointerException\n\tat com.example.MyJob$.main(MyJob.scala:42)\n";
let out = compress_spark("", raw);
assert!(out.contains("NullPointerException"), "{out}");
assert!(out.contains("MyJob.scala:42"), "{out}");
}
}