bctx-weave 0.1.23

bctx-weave — FilterMesh lens pipeline, CLI interception, domain compression
Documentation
use forge::signal::compactor;
use once_cell::sync::Lazy;
use regex::Regex;

// JVM / log4j INFO/DEBUG noise lines
static JVM_NOISE_RE: Lazy<Regex> = Lazy::new(|| {
    Regex::new(r"(?m)^\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} (?:INFO|DEBUG|WARN NativeCodeLoader|WARN util\.NativeCodeLoader)[^\n]*\n?").unwrap()
});
// SLF4J binding warnings
static SLF4J_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)^SLF4J:[^\n]*\n?").unwrap());
// "log4j:WARN ..." lines
static LOG4J_RE: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?m)^log4j:WARN[^\n]*\n?").unwrap());
// Block manager verbose: "INFO BlockManagerInfo:" / "INFO BlockManager:"
static BLOCK_MANAGER_RE: Lazy<Regex> = Lazy::new(|| {
    Regex::new(r"(?m)^\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2} INFO (?:BlockManager|BlockManagerInfo|MemoryStore|TaskSetManager|DAGScheduler|TaskSchedulerImpl|CoarseGrainedSchedulerBackend|SparkContext|SparkUI)[^\n]*\n?").unwrap()
});
// Stage progress: "[Stage 0:=========>    (2 + 2) / 4]"
static STAGE_PROGRESS_RE: Lazy<Regex> =
    Lazy::new(|| Regex::new(r"(?m)^\[Stage \d+:[^\]]+\]\s*\r?\n?").unwrap());

// ── compress spark output ─────────────────────────────────────────────────────

pub fn compress_spark(subcmd: &str, raw: &str) -> String {
    let _ = subcmd;
    let cleaned = compactor::normalise(raw);
    let s = SLF4J_RE.replace_all(&cleaned, "");
    let s = LOG4J_RE.replace_all(&s, "");
    let s = BLOCK_MANAGER_RE.replace_all(&s, "");
    let s = JVM_NOISE_RE.replace_all(&s, "");

    // Collapse repeated stage progress to last occurrence
    let s = STAGE_PROGRESS_RE.replace_all(&s, "");

    // Keep: ERROR lines, final result lines, user print output, WARN (non-NativeCodeLoader)
    let mut result_lines: Vec<&str> = Vec::new();
    let mut error_lines: Vec<&str> = Vec::new();

    for line in s.lines() {
        let t = line.trim();
        if t.is_empty() {
            continue;
        }
        if t.contains("ERROR") || t.contains("Exception") || t.contains("FAILED") {
            error_lines.push(line);
        } else if !t.starts_with("24/")
            && !t.starts_with("23/")
            && !t.starts_with("22/")
            && !t.starts_with("INFO")
            && !t.starts_with("DEBUG")
        {
            result_lines.push(line);
        }
    }

    let mut out: Vec<&str> = Vec::new();
    out.extend(error_lines.iter().copied());
    out.extend(result_lines.iter().copied());

    if out.is_empty() {
        return compactor::collapse_blanks(&s);
    }
    if out.len() > 50 {
        return format!(
            "{}\n… [{} more lines]",
            out[..50].join("\n"),
            out.len() - 50
        );
    }
    out.join("\n")
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn strips_jvm_and_slf4j_noise() {
        let raw = "SLF4J: Class path contains multiple SLF4J bindings.\nlog4j:WARN No appenders could be found for logger.\n24/01/01 12:00:00 INFO SparkContext: Running Spark version 3.4.1\n24/01/01 12:00:00 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy\nHello from Spark job!\n24/01/01 12:00:01 INFO SparkContext: Successfully stopped SparkContext\n";
        let out = compress_spark("", raw);
        assert!(!out.contains("SLF4J:"), "{out}");
        assert!(!out.contains("log4j:WARN"), "{out}");
        assert!(!out.contains("INFO SparkContext"), "{out}");
        assert!(out.contains("Hello from Spark job"), "{out}");
    }

    #[test]
    fn keeps_error_lines() {
        let raw = "SLF4J: Class path contains multiple SLF4J bindings.\n24/01/01 12:00:00 INFO SparkContext: Running Spark\nException in thread \"main\" java.lang.NullPointerException\n\tat com.example.MyJob$.main(MyJob.scala:42)\n";
        let out = compress_spark("", raw);
        assert!(out.contains("NullPointerException"), "{out}");
        assert!(out.contains("MyJob.scala:42"), "{out}");
    }
}