use anyhow::Result;
use chrono::Utc;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use serde::Serialize;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::io::{self, Write};
use std::time::Duration;
use crate::config::Config;
use crate::normalize::Normalizer;
use crate::patterns::{LogLine, Token};
pub fn apply_pii_masking(original: &str, tokens: &[Token]) -> String {
let mut result = original.to_string();
let mut email_ranges = Vec::new();
for token in tokens {
if let Token::Email(email) = token {
let mut start = 0;
while let Some(pos) = result[start..].find(email) {
let abs_pos = start + pos;
email_ranges.push((abs_pos, abs_pos + email.len()));
start = abs_pos + email.len();
}
}
}
email_ranges.sort_by(|a, b| b.0.cmp(&a.0));
for (start, end) in email_ranges {
result.replace_range(start..end, "<EMAIL>");
}
result
}
#[derive(Debug)]
struct PatternGroup {
lines: Vec<LogLine>,
#[allow(dead_code)]
collapsed: bool,
position: usize, first_line_no: usize,
last_line_no: usize,
}
impl PatternGroup {
fn new(line: LogLine, position: usize) -> Self {
Self {
lines: vec![line],
collapsed: false,
position,
first_line_no: position,
last_line_no: position,
}
}
fn add_line(&mut self, line: LogLine, line_no: usize) {
self.lines.push(line);
self.last_line_no = line_no;
}
fn should_collapse(&self, min_collapse: usize) -> bool {
self.lines.len() >= min_collapse
}
fn first(&self) -> &LogLine {
&self.lines[0]
}
fn last(&self) -> &LogLine {
&self.lines[self.lines.len() - 1]
}
fn count(&self) -> usize {
self.lines.len()
}
}
pub struct PatternFolder {
config: Config,
normalizer: Normalizer,
buffer: Vec<PatternGroup>,
stats: FoldingStats,
position_counter: usize,
batch_buffer: Vec<String>,
next_json_id: usize,
rollup_computer: RollupComputer,
}
#[derive(Debug, Default)]
pub struct FoldingStats {
pub total_lines: usize,
pub output_lines: usize, pub collapsed_groups: usize,
pub lines_saved: usize,
pub patterns_detected: usize,
pub timestamps: usize,
pub ips: usize,
pub hashes: usize,
pub uuids: usize,
pub pids: usize,
pub durations: usize,
pub http_status: usize,
pub sizes: usize,
pub percentages: usize,
pub paths: usize,
pub kubernetes: usize,
pub emails: usize, }
#[derive(Serialize)]
struct StatsJson {
input_lines: usize,
output_lines: usize,
compression_ratio: f64,
collapsed_groups: usize,
lines_saved: usize,
patterns_detected: usize,
elapsed_ms: u64,
pattern_hits: PatternHits,
}
#[derive(Serialize)]
struct PatternHits {
timestamps: usize,
ips: usize,
hashes: usize,
uuids: usize,
pids: usize,
durations: usize,
http_status: usize,
sizes: usize,
percentages: usize,
paths: usize,
kubernetes: usize,
emails: usize,
}
#[derive(Serialize)]
struct LineRef {
line: String,
line_no: usize,
}
#[derive(Serialize)]
struct TimeRange {
first_seen: Option<String>,
last_seen: Option<String>,
}
#[derive(Serialize)]
struct GroupRecord {
#[serde(rename = "type")]
record_type: &'static str, id: usize,
count: usize,
token_types: Vec<&'static str>,
normalized: String,
first: LineRef,
last: LineRef,
time_range: TimeRange,
variation: GroupRollup,
}
#[derive(Serialize)]
struct SummaryRecord {
#[serde(rename = "type")]
record_type: &'static str, #[serde(flatten)]
stats: StatsJson,
}
fn token_type_name(token: &Token) -> &'static str {
match token {
Token::Timestamp(_) => "TIMESTAMP",
Token::IPv4(_) => "IPV4",
Token::IPv6(_) => "IPV6",
Token::Port(_) => "PORT",
Token::Hash(_, _) => "HASH",
Token::Uuid(_) => "UUID",
Token::Pid(_) => "PID",
Token::ThreadID(_) => "THREAD_ID",
Token::Path(_) => "PATH",
Token::Json(_) => "JSON",
Token::Duration(_) => "DURATION",
Token::Size(_) => "SIZE",
Token::Number(_) => "NUMBER",
Token::HttpStatus(_) => "HTTP_STATUS",
Token::QuotedString(_) => "QUOTED_STRING",
Token::Name(_) => "NAME",
Token::KubernetesNamespace(_) => "K8S_NAMESPACE",
Token::VolumeName(_) => "K8S_VOLUME",
Token::PluginType(_) => "K8S_PLUGIN",
Token::PodName(_) => "K8S_POD",
Token::HttpStatusClass(_) => "HTTP_STATUS_CLASS",
Token::BracketContext(_) => "BRACKET_CONTEXT",
Token::KeyValuePair { .. } => "KEY_VALUE",
Token::LogWithModule { .. } => "LOG_WITH_MODULE",
Token::StructuredMessage { .. } => "STRUCTURED_MESSAGE",
Token::Email(_) => "EMAIL",
}
}
fn first_timestamp_in(tokens: &[Token]) -> Option<String> {
tokens.iter().find_map(|t| match t {
Token::Timestamp(s) => Some(s.clone()),
_ => None,
})
}
const ROLLUP_K: usize = 7;
const ROLLUP_DISTINCT_CAP: usize = 64;
const ROLLUP_TEXT_SAMPLE_THRESHOLD: usize = 3;
#[derive(Serialize, Debug, Clone, PartialEq)]
struct VariationEntry {
pub distinct_count: usize,
pub samples: Vec<String>,
pub capped: bool,
}
type GroupRollup = BTreeMap<&'static str, VariationEntry>;
fn is_sample_worthy(token: &Token) -> bool {
matches!(
token,
Token::Uuid(_)
| Token::IPv4(_)
| Token::IPv6(_)
| Token::Path(_)
| Token::Email(_)
| Token::Hash(_, _)
| Token::KubernetesNamespace(_)
| Token::VolumeName(_)
| Token::PluginType(_)
| Token::PodName(_)
| Token::QuotedString(_)
| Token::Name(_)
| Token::HttpStatus(_)
| Token::HttpStatusClass(_)
| Token::BracketContext(_)
| Token::Json(_)
)
}
fn token_value_string(token: &Token) -> String {
match token {
Token::Timestamp(s)
| Token::IPv4(s)
| Token::IPv6(s)
| Token::Uuid(s)
| Token::Path(s)
| Token::Json(s)
| Token::Duration(s)
| Token::Size(s)
| Token::Number(s)
| Token::QuotedString(s)
| Token::Name(s)
| Token::KubernetesNamespace(s)
| Token::VolumeName(s)
| Token::PluginType(s)
| Token::PodName(s)
| Token::ThreadID(s)
| Token::HttpStatusClass(s)
| Token::Email(s) => s.clone(),
Token::Hash(_, s) => s.clone(),
Token::BracketContext(parts) => parts.join(","),
Token::Port(p) => p.to_string(),
Token::HttpStatus(s) => s.to_string(),
Token::Pid(p) => p.to_string(),
Token::KeyValuePair { key, value_type } => format!("{key}={value_type}"),
Token::LogWithModule { level, module } => format!("{level}:{module}"),
Token::StructuredMessage { component, level } => format!("{component}:{level}"),
}
}
fn hash_token_value(token: &Token) -> u64 {
let canonical = token_value_string(token);
const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
const FNV_PRIME: u64 = 0x0100_0000_01b3;
let mut h: u64 = FNV_OFFSET;
for b in canonical.as_bytes() {
h ^= u64::from(*b);
h = h.wrapping_mul(FNV_PRIME);
}
h
}
fn seed_for_group(normalized: &str) -> u64 {
const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
const FNV_PRIME: u64 = 0x0100_0000_01b3;
let mut h: u64 = FNV_OFFSET;
for b in normalized.as_bytes() {
h ^= u64::from(*b);
h = h.wrapping_mul(FNV_PRIME);
}
h
}
fn render_compact_marker(
count: usize,
rollup: &GroupRollup,
first_ts: Option<&str>,
last_ts: Option<&str>,
inline_threshold: usize,
essence_mode: bool,
) -> String {
let mut out = format!("[+{count} similar");
if !essence_mode && let (Some(a), Some(b)) = (first_ts, last_ts) {
out.push_str(" | ");
out.push_str(a);
out.push_str(" → ");
out.push_str(b);
}
let worthy: Vec<(&&'static str, &VariationEntry)> = rollup
.iter()
.filter(|(_, entry)| !entry.samples.is_empty() || entry.distinct_count <= inline_threshold)
.filter(|(_, entry)| entry.distinct_count > 0)
.collect();
if !worthy.is_empty() {
const SAMPLE_MAX_LEN: usize = 50;
fn truncate_sample(s: &str) -> String {
if s.len() <= SAMPLE_MAX_LEN {
s.to_string()
} else {
let mut out = s.chars().take(SAMPLE_MAX_LEN - 1).collect::<String>();
out.push('…');
out
}
}
out.push_str(" | ");
let mut first = true;
for (name, entry) in &worthy {
if !first {
out.push_str(", ");
}
first = false;
out.push_str(&name.to_lowercase());
out.push('×');
out.push_str(&entry.distinct_count.to_string());
if entry.capped {
out.push('+');
}
if entry.distinct_count <= inline_threshold
&& !entry.capped
&& !entry.samples.is_empty()
{
out.push_str(" {");
let truncated: Vec<String> =
entry.samples.iter().map(|s| truncate_sample(s)).collect();
out.push_str(&truncated.join(", "));
out.push('}');
}
}
}
out.push(']');
out
}
enum Accumulator {
Values(HashSet<String>),
Hashes(HashSet<u64>),
}
impl Accumulator {
fn len(&self) -> usize {
match self {
Self::Values(s) => s.len(),
Self::Hashes(s) => s.len(),
}
}
}
struct RollupComputer {
k: usize,
distinct_cap: usize,
}
impl RollupComputer {
fn new(k: usize, distinct_cap: usize) -> Self {
Self { k, distinct_cap }
}
fn with_defaults() -> Self {
Self::new(ROLLUP_K, ROLLUP_DISTINCT_CAP)
}
fn compute(&self, group: &PatternGroup) -> GroupRollup {
let mut per_type: BTreeMap<&'static str, (Accumulator, bool)> = BTreeMap::new();
let capacity_hint = group.lines.len().min(self.distinct_cap);
for line in &group.lines {
for token in &line.tokens {
let name = token_type_name(token);
let sample_worthy = is_sample_worthy(token);
let entry = per_type.entry(name).or_insert_with(|| {
(
if sample_worthy {
Accumulator::Values(HashSet::with_capacity(capacity_hint))
} else {
Accumulator::Hashes(HashSet::with_capacity(capacity_hint))
},
false, )
});
if entry.1 {
continue;
}
match &mut entry.0 {
Accumulator::Values(s) => {
if s.len() >= self.distinct_cap {
entry.1 = true;
} else {
s.insert(token_value_string(token));
}
}
Accumulator::Hashes(s) => {
if s.len() >= self.distinct_cap {
entry.1 = true;
} else {
s.insert(hash_token_value(token));
}
}
}
}
}
let mut rng = ChaCha8Rng::seed_from_u64(seed_for_group(&group.first().normalized));
let mut out: GroupRollup = BTreeMap::new();
for (name, (acc, capped)) in per_type {
let distinct_count = acc.len();
let samples = match acc {
Accumulator::Values(s) => {
let mut distinct: Vec<String> = s.into_iter().collect();
distinct.sort();
let drawn_refs: Vec<&String> =
distinct.choose_multiple(&mut rng, self.k).collect();
let mut drawn: Vec<String> = drawn_refs.into_iter().cloned().collect();
drawn.sort();
drawn
}
Accumulator::Hashes(_) => Vec::new(),
};
out.insert(
name,
VariationEntry {
distinct_count,
samples,
capped,
},
);
}
out
}
}
impl PatternFolder {
pub fn new(config: Config) -> Self {
let normalizer = Normalizer::new(config.clone());
Self {
config,
normalizer,
buffer: Vec::new(),
stats: FoldingStats::default(),
position_counter: 0,
batch_buffer: Vec::new(),
next_json_id: 0,
rollup_computer: RollupComputer::with_defaults(),
}
}
fn is_json_output(&self) -> bool {
matches!(self.config.output_format.as_str(), "json" | "jsonl")
}
fn format_group_dispatch(&mut self, group: &PatternGroup) -> Result<String> {
let rollup = if group.count() >= self.config.min_collapse {
self.rollup_computer.compute(group)
} else {
BTreeMap::new()
};
if self.is_json_output() {
self.format_group_json(group, rollup)
} else {
self.format_group(group, &rollup)
}
}
fn format_group_json(
&mut self,
group: &PatternGroup,
variation: GroupRollup,
) -> Result<String> {
let id = self.next_json_id;
self.next_json_id += 1;
if group.count() >= self.config.min_collapse && !self.config.essence_mode {
self.stats.collapsed_groups += 1;
self.stats.lines_saved += group.count() - 1;
}
let mut token_types: std::collections::BTreeSet<&'static str> =
std::collections::BTreeSet::new();
for t in &group.first().tokens {
token_types.insert(token_type_name(t));
}
for t in &group.last().tokens {
token_types.insert(token_type_name(t));
}
let record = GroupRecord {
record_type: "group",
id,
count: group.count(),
token_types: token_types.into_iter().collect(),
normalized: group.first().normalized.clone(),
first: LineRef {
line: group.first().original.clone(),
line_no: group.first_line_no,
},
last: LineRef {
line: group.last().original.clone(),
line_no: group.last_line_no,
},
time_range: TimeRange {
first_seen: first_timestamp_in(&group.first().tokens),
last_seen: first_timestamp_in(&group.last().tokens),
},
variation,
};
Ok(serde_json::to_string(&record)?)
}
pub fn print_summary_json(&self, writer: &mut impl io::Write, elapsed: Duration) -> Result<()> {
let compression_ratio = if self.stats.total_lines > 0 {
(self.stats.lines_saved as f64 / self.stats.total_lines as f64) * 100.0
} else {
0.0
};
let record = SummaryRecord {
record_type: "summary",
stats: StatsJson {
input_lines: self.stats.total_lines,
output_lines: self.stats.output_lines,
compression_ratio,
collapsed_groups: self.stats.collapsed_groups,
lines_saved: self.stats.lines_saved,
patterns_detected: self.stats.patterns_detected,
elapsed_ms: elapsed.as_millis() as u64,
pattern_hits: PatternHits {
timestamps: self.stats.timestamps,
ips: self.stats.ips,
hashes: self.stats.hashes,
uuids: self.stats.uuids,
pids: self.stats.pids,
durations: self.stats.durations,
http_status: self.stats.http_status,
sizes: self.stats.sizes,
percentages: self.stats.percentages,
paths: self.stats.paths,
kubernetes: self.stats.kubernetes,
emails: self.stats.emails,
},
},
};
serde_json::to_writer(&mut *writer, &record)?;
writeln!(writer)?;
Ok(())
}
pub fn process_line(&mut self, line: &str) -> Result<Option<String>> {
self.stats.total_lines += 1;
self.position_counter += 1;
if self.config.thread_count != Some(1) {
self.batch_buffer.push(line.to_string());
if self.batch_buffer.len() >= 10_000 {
self.process_batch()?;
}
return Ok(None);
}
let normalized_line = self.normalizer.normalize_line(line.to_string())?;
if !normalized_line.tokens.is_empty() {
self.stats.patterns_detected += 1;
self.count_pattern_types(&normalized_line.tokens);
}
let mut match_index = None;
for (i, group) in self.buffer.iter().enumerate() {
if self.normalizer.are_similar(&normalized_line, group.first()) {
match_index = Some(i);
break;
}
}
if let Some(index) = match_index {
self.buffer[index].add_line(normalized_line, self.position_counter);
} else {
self.buffer
.push(PatternGroup::new(normalized_line, self.position_counter));
}
if self.should_flush_buffer() {
return self.flush_oldest_safe_group();
}
Ok(None)
}
fn flush_oldest_safe_group(&mut self) -> Result<Option<String>> {
if self.buffer.is_empty() {
return Ok(None);
}
let current_position = self.position_counter;
let safe_distance = 100;
let mut oldest_index = None;
let mut oldest_position = usize::MAX;
for (i, group) in self.buffer.iter().enumerate() {
let is_old_enough = current_position - group.position > safe_distance;
let is_ready = group.should_collapse(self.config.min_collapse) || is_old_enough;
if is_ready && group.position < oldest_position {
oldest_position = group.position;
oldest_index = Some(i);
}
}
if let Some(index) = oldest_index {
let group = self.buffer.remove(index);
let formatted = self.format_group_dispatch(&group)?;
self.stats.output_lines += formatted.lines().count();
return Ok(Some(formatted));
}
Ok(None)
}
pub fn finish_summary(
&mut self,
top_n: Option<usize>,
fit_budget: Option<usize>,
) -> Result<()> {
if !self.batch_buffer.is_empty() {
self.process_batch()?;
}
let mut merged: HashMap<String, (usize, String)> = HashMap::new();
for group in &self.buffer {
let key = group.first().normalized.clone();
let count = group.count();
let representative = group.first().original.clone();
merged
.entry(key)
.and_modify(|(c, _)| *c += count)
.or_insert((count, representative));
}
let mut sorted: Vec<(usize, String)> = merged.into_values().collect();
sorted.sort_by(|a, b| b.0.cmp(&a.0));
let total_patterns = sorted.len();
const DEFAULT_SUMMARY_CAP: usize = 30;
let (display, was_capped, fit_truncated): (Vec<_>, bool, usize) = if let Some(0) = top_n {
if let Some(budget) = fit_budget {
if sorted.len() > budget {
let show = budget.saturating_sub(1);
let remaining = sorted.len() - show;
(sorted.into_iter().take(show).collect(), false, remaining)
} else {
(sorted, false, 0)
}
} else {
(sorted, false, 0)
}
} else if let Some(n) = top_n {
(sorted.into_iter().take(n).collect(), false, 0)
} else if let Some(budget) = fit_budget {
if sorted.len() > budget {
let show = budget.saturating_sub(1);
let remaining = sorted.len() - show;
(sorted.into_iter().take(show).collect(), false, remaining)
} else {
(sorted, false, 0)
}
} else if total_patterns > DEFAULT_SUMMARY_CAP {
(
sorted.into_iter().take(DEFAULT_SUMMARY_CAP).collect(),
true,
0,
)
} else {
(sorted, false, 0)
};
let shown_count = display.len();
use std::io::IsTerminal;
let max_width: Option<usize> = if std::io::stdout().is_terminal() {
terminal_size::terminal_size().map(|(w, _)| w.0 as usize)
} else {
None
};
for (count, representative) in &display {
let prefix = format!("[{count}x] ");
match max_width {
Some(width) if prefix.len() + representative.len() > width => {
let avail = width.saturating_sub(prefix.len() + 3); if avail > 20 {
println!("{prefix}{}...", &representative[..avail]);
} else {
println!("{prefix}{representative}");
}
}
_ => println!("{prefix}{representative}"),
}
}
if fit_truncated > 0 {
println!("... {fit_truncated} more patterns (remove --fit for full output)");
}
let shown_lines: usize = display.iter().map(|(c, _)| c).sum();
let coverage = if self.stats.total_lines > 0 {
(shown_lines as f64 / self.stats.total_lines as f64) * 100.0
} else {
0.0
};
if was_capped {
eprintln!(
"({shown_count} of {total_patterns} patterns, {coverage:.0}% coverage — use --top N to adjust, or --top 0 for all)",
);
} else {
eprintln!(
"({shown_count} of {total_patterns} patterns, {shown_lines} of {} lines, {coverage:.0}% coverage)",
self.stats.total_lines
);
}
Ok(())
}
pub fn finish(&mut self) -> Result<Vec<String>> {
if !self.batch_buffer.is_empty() {
self.process_batch()?;
}
let mut output = Vec::new();
self.buffer.sort_by_key(|group| group.position);
while !self.buffer.is_empty() {
let group = self.buffer.remove(0);
let formatted = self.format_group_dispatch(&group)?;
self.stats.output_lines += formatted.lines().count();
output.push(formatted);
}
Ok(output)
}
pub fn finish_top_n(&mut self, n: usize) -> Result<(Vec<(usize, String)>, usize, usize)> {
if !self.batch_buffer.is_empty() {
self.process_batch()?;
}
let mut groups_with_counts: Vec<(usize, PatternGroup)> =
self.buffer.drain(..).map(|g| (g.count(), g)).collect();
groups_with_counts.sort_by(|a, b| b.0.cmp(&a.0));
let total_groups = groups_with_counts.len();
let total_input_lines = self.stats.total_lines;
let top_groups: Vec<(usize, PatternGroup)> =
groups_with_counts.into_iter().take(n).collect();
let lines_covered: usize = top_groups.iter().map(|(c, _)| c).sum();
let mut output = Vec::new();
for (count, group) in top_groups {
let formatted = self.format_group_dispatch(&group)?;
self.stats.output_lines += formatted.lines().count();
output.push((count, formatted));
}
Ok((
output,
total_groups,
if total_input_lines > 0 {
(lines_covered as f64 / total_input_lines as f64 * 100.0) as usize
} else {
0
},
))
}
fn should_flush_buffer(&self) -> bool {
const CONSTITUTIONAL_FLUSH_THRESHOLD: usize = 1000;
self.buffer.len() > CONSTITUTIONAL_FLUSH_THRESHOLD
}
#[allow(dead_code)]
fn apply_second_similarity_pass(&mut self) -> Result<()> {
if self.buffer.len() <= 1 {
return Ok(());
}
let mut merged_any = true;
let mut iterations = 0;
while merged_any && iterations < 10 {
merged_any = false;
iterations += 1;
let mut i = 0;
while i < self.buffer.len() {
let mut j = i + 1;
while j < self.buffer.len() {
let Some(group1_first) = self.buffer[i].lines.first() else {
j += 1;
continue;
};
let Some(group2_first) = self.buffer[j].lines.first() else {
j += 1;
continue;
};
if group1_first.normalized == group2_first.normalized {
j += 1;
continue;
}
let similarity = self.normalizer.similarity_score(group1_first, group2_first);
if similarity >= f64::from(self.config.threshold) {
let group_to_merge = self.buffer.remove(j);
let merged_last_line_no = group_to_merge.last_line_no;
for line in group_to_merge.lines {
self.buffer[i].add_line(line, merged_last_line_no);
}
merged_any = true;
} else {
j += 1;
}
}
i += 1;
}
}
Ok(())
}
fn count_pattern_types(&mut self, tokens: &[Token]) {
for token in tokens {
match token {
Token::Timestamp(_) => self.stats.timestamps += 1,
Token::IPv4(_) | Token::IPv6(_) => self.stats.ips += 1,
Token::Port(_) => self.stats.ips += 1, Token::Hash(_, _) => self.stats.hashes += 1,
Token::Uuid(_) => self.stats.uuids += 1,
Token::Pid(_) | Token::ThreadID(_) => self.stats.pids += 1,
Token::Duration(_) => self.stats.durations += 1,
Token::Size(_) => self.stats.sizes += 1,
Token::Number(_) => self.stats.percentages += 1, Token::HttpStatus(_) => self.stats.http_status += 1,
Token::Path(_) => self.stats.paths += 1,
Token::Json(_) => self.stats.paths += 1, Token::QuotedString(_) => self.stats.percentages += 1, Token::Name(_) => self.stats.percentages += 1, Token::KubernetesNamespace(_)
| Token::VolumeName(_)
| Token::PluginType(_)
| Token::PodName(_) => self.stats.kubernetes += 1,
Token::HttpStatusClass(_) => self.stats.http_status += 1,
Token::BracketContext(_) => self.stats.percentages += 1, Token::KeyValuePair { .. } => self.stats.percentages += 1, Token::LogWithModule { .. } => self.stats.percentages += 1, Token::StructuredMessage { .. } => self.stats.percentages += 1, Token::Email(_) => self.stats.emails += 1, }
}
}
fn format_group(&mut self, group: &PatternGroup, rollup: &GroupRollup) -> Result<String> {
if group.should_collapse(self.config.min_collapse) && !self.config.essence_mode {
self.stats.collapsed_groups += 1;
self.stats.lines_saved += group.count() - 3;
let collapsed_line = if !rollup.is_empty() {
let first_ts = first_timestamp_in(&group.first().tokens);
let last_ts = first_timestamp_in(&group.last().tokens);
render_compact_marker(
group.count() - 2,
rollup,
first_ts.as_deref(),
last_ts.as_deref(),
ROLLUP_TEXT_SAMPLE_THRESHOLD,
self.config.essence_mode,
)
} else {
self.normalizer.format_collapsed_line(
group.first(),
group.last(),
group.count() - 2, )
};
let mut result = String::new();
let first_line = if self.config.essence_mode {
&group.first().normalized
} else {
&group.first().original
};
let first_line_output = if self.config.sanitize_pii && !self.config.essence_mode {
apply_pii_masking(first_line, &group.first().tokens)
} else {
first_line.clone()
};
result.push_str(&first_line_output);
result.push('\n');
result.push_str(&collapsed_line);
if group.count() > 1 {
let last_line = if self.config.essence_mode {
&group.last().normalized
} else {
&group.last().original
};
if !self.config.essence_mode || first_line != last_line {
result.push('\n');
let last_line_output = if self.config.sanitize_pii && !self.config.essence_mode
{
apply_pii_masking(last_line, &group.last().tokens)
} else {
last_line.clone()
};
result.push_str(&last_line_output);
}
}
Ok(result)
} else {
let mut result = String::new();
if self.config.essence_mode {
let line_text = &group.first().normalized;
result.push_str(line_text);
if group.count() > 1 {
self.stats.lines_saved += group.count() - 1;
}
} else {
for (i, line) in group.lines.iter().enumerate() {
if i > 0 {
result.push('\n');
}
let line_output = if self.config.sanitize_pii {
apply_pii_masking(&line.original, &line.tokens)
} else {
line.original.clone()
};
result.push_str(&line_output);
}
}
Ok(result)
}
}
pub fn print_stats<W: Write>(&self, writer: &mut W) -> Result<()> {
let compression_ratio = if self.stats.total_lines > 0 {
(self.stats.lines_saved as f64 / self.stats.total_lines as f64) * 100.0
} else {
0.0
};
let output_lines = self.stats.output_lines;
writeln!(writer, "\n---")?;
writeln!(writer, "# lessence Compression Report")?;
writeln!(
writer,
"*Generated by lessence v{} on {}*",
env!("CARGO_PKG_VERSION"),
Utc::now().format("%Y-%m-%dT%H:%M:%SZ")
)?;
writeln!(writer)?;
writeln!(writer, "## Summary")?;
writeln!(writer, "- **Original**: {} lines", self.stats.total_lines)?;
writeln!(
writer,
"- **Compressed**: {output_lines} lines ({compression_ratio:.1}% reduction)"
)?;
writeln!(
writer,
"- **Patterns detected**: {} across {} categories",
self.stats.patterns_detected,
self.count_active_pattern_types()
)?;
writeln!(
writer,
"- **Collapsed groups**: {} ({} lines saved)",
self.stats.collapsed_groups, self.stats.lines_saved
)?;
writeln!(writer)?;
writeln!(writer, "## Pattern Distribution")?;
writeln!(writer, "| Pattern Type | Count | Description |")?;
writeln!(writer, "|--------------|-------|-------------|")?;
if self.stats.timestamps > 0 {
writeln!(
writer,
"| Timestamps | {} | Log timestamps, dates, times |",
self.stats.timestamps
)?;
}
if self.stats.ips > 0 {
writeln!(
writer,
"| IP Addresses | {} | IPv4, IPv6, ports, network addresses |",
self.stats.ips
)?;
}
if self.stats.hashes > 0 {
writeln!(
writer,
"| Hashes | {} | Pod UIDs, container IDs, volume names, checksums |",
self.stats.hashes
)?;
}
if self.stats.uuids > 0 {
writeln!(
writer,
"| UUIDs | {} | Request IDs, trace IDs, unique identifiers |",
self.stats.uuids
)?;
}
if self.stats.durations > 0 {
writeln!(
writer,
"| Durations | {} | Timeouts, latencies, elapsed times |",
self.stats.durations
)?;
}
if self.stats.pids > 0 {
writeln!(
writer,
"| Process IDs | {} | PIDs, thread IDs, process identifiers |",
self.stats.pids
)?;
}
if self.stats.sizes > 0 {
writeln!(
writer,
"| File Sizes | {} | Memory usage, file sizes, data volumes |",
self.stats.sizes
)?;
}
if self.stats.percentages > 0 {
writeln!(
writer,
"| Numbers/Percentages | {} | CPU usage, percentages, metrics |",
self.stats.percentages
)?;
}
if self.stats.http_status > 0 {
writeln!(
writer,
"| HTTP Status | {} | Response codes, error codes |",
self.stats.http_status
)?;
}
if self.stats.paths > 0 {
writeln!(
writer,
"| File Paths | {} | File paths, URLs, directories |",
self.stats.paths
)?;
}
if self.stats.kubernetes > 0 {
writeln!(
writer,
"| Kubernetes | {} | Namespaces, volumes, plugins, pod names |",
self.stats.kubernetes
)?;
}
if self.stats.emails > 0 {
writeln!(
writer,
"| Email Addresses | {} | RFC 5322 email addresses, user accounts |",
self.stats.emails
)?;
}
writeln!(writer)?;
writeln!(writer, "## Recommendations for Analysis")?;
if compression_ratio > 90.0 {
writeln!(
writer,
"- **High compression ratio** ({compression_ratio:.1}%) indicates many repetitive patterns"
)?;
} else if compression_ratio > 70.0 {
writeln!(
writer,
"- **Moderate compression ratio** ({compression_ratio:.1}%) indicates some repetitive patterns"
)?;
} else {
writeln!(
writer,
"- **Low compression ratio** ({compression_ratio:.1}%) indicates diverse log content"
)?;
}
writeln!(
writer,
"- **Search strategy**: Use compressed output to identify error types, then grep original logs for details"
)?;
writeln!(
writer,
"- **Variation indicators**: Pay attention to `[+N similar, varying: X, Y]` to understand what changes between similar errors"
)?;
writeln!(
writer,
"- **Focus areas**: Unique error messages that couldn't be compressed likely indicate distinct issues"
)?;
if self.stats.collapsed_groups > 50 {
writeln!(
writer,
"- **High pattern repetition**: {} collapsed groups suggest systematic issues worth investigating",
self.stats.collapsed_groups
)?;
}
writeln!(writer, "---")?;
Ok(())
}
pub fn print_stats_json(&self, elapsed: Duration) -> Result<()> {
let compression_ratio = if self.stats.total_lines > 0 {
(self.stats.lines_saved as f64 / self.stats.total_lines as f64) * 100.0
} else {
0.0
};
let stats_json = StatsJson {
input_lines: self.stats.total_lines,
output_lines: self.stats.output_lines,
compression_ratio,
collapsed_groups: self.stats.collapsed_groups,
lines_saved: self.stats.lines_saved,
patterns_detected: self.stats.patterns_detected,
elapsed_ms: elapsed.as_millis() as u64,
pattern_hits: PatternHits {
timestamps: self.stats.timestamps,
ips: self.stats.ips,
hashes: self.stats.hashes,
uuids: self.stats.uuids,
pids: self.stats.pids,
durations: self.stats.durations,
http_status: self.stats.http_status,
sizes: self.stats.sizes,
percentages: self.stats.percentages,
paths: self.stats.paths,
kubernetes: self.stats.kubernetes,
emails: self.stats.emails,
},
};
let stderr = io::stderr();
let mut handle = stderr.lock();
serde_json::to_writer(&mut handle, &stats_json)?;
writeln!(handle)?;
Ok(())
}
fn count_active_pattern_types(&self) -> usize {
let mut count = 0;
if self.stats.timestamps > 0 {
count += 1;
}
if self.stats.ips > 0 {
count += 1;
}
if self.stats.hashes > 0 {
count += 1;
}
if self.stats.uuids > 0 {
count += 1;
}
if self.stats.durations > 0 {
count += 1;
}
if self.stats.pids > 0 {
count += 1;
}
if self.stats.sizes > 0 {
count += 1;
}
if self.stats.percentages > 0 {
count += 1;
}
if self.stats.http_status > 0 {
count += 1;
}
if self.stats.paths > 0 {
count += 1;
}
if self.stats.kubernetes > 0 {
count += 1;
}
count
}
fn process_batch(&mut self) -> Result<()> {
let batch = std::mem::take(&mut self.batch_buffer);
let processed_lines = self.parallel_pattern_detection(&batch)?;
for processed_line in processed_lines {
self.sequential_clustering(processed_line)?;
}
Ok(())
}
fn parallel_pattern_detection(&self, lines: &[String]) -> Result<Vec<LogLine>> {
use rayon::prelude::*;
let processed_lines: Vec<LogLine> = lines
.par_iter()
.map(|line| {
self.normalizer.normalize_line(line.clone())
})
.collect::<Result<Vec<_>, _>>()?;
Ok(processed_lines)
}
fn sequential_clustering(&mut self, normalized_line: LogLine) -> Result<()> {
if !normalized_line.tokens.is_empty() {
self.stats.patterns_detected += 1;
self.count_pattern_types(&normalized_line.tokens);
}
let mut match_index = None;
for (i, group) in self.buffer.iter().enumerate() {
if self.normalizer.are_similar(&normalized_line, group.first()) {
match_index = Some(i);
break;
}
}
if let Some(index) = match_index {
self.buffer[index].add_line(normalized_line, self.position_counter);
} else {
self.buffer
.push(PatternGroup::new(normalized_line, self.position_counter));
}
Ok(())
}
pub fn get_stats(&self) -> &FoldingStats {
&self.stats
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simple_folding() -> Result<()> {
let config = Config::default();
let mut folder = PatternFolder::new(config);
let line1 = "2025-01-20 10:15:01 [pid=12345] Connection failed to 192.168.1.100:8080";
let line2 = "2025-01-20 10:15:02 [pid=12346] Connection failed to 192.168.1.101:8081";
let line3 = "2025-01-20 10:15:03 [pid=12347] Connection failed to 192.168.1.102:8082";
folder.process_line(line1)?;
folder.process_line(line2)?;
let result = folder.process_line(line3)?;
assert!(result.is_none());
Ok(())
}
#[test]
fn test_folding_with_finish() -> Result<()> {
let config = Config {
min_collapse: 2, ..Config::default()
};
let mut folder = PatternFolder::new(config);
let line1 = "2025-01-20 10:15:01 [pid=12345] Connection failed to 192.168.1.100:8080";
let line2 = "2025-01-20 10:15:02 [pid=12346] Connection failed to 192.168.1.101:8081";
let line3 = "2025-01-20 10:15:03 [pid=12347] Connection failed to 192.168.1.102:8082";
folder.process_line(line1)?;
folder.process_line(line2)?;
folder.process_line(line3)?;
let results = folder.finish()?;
assert!(!results.is_empty());
let output = results.join("\n");
assert!(
output.contains("similar"),
"Expected 'similar' in compact output, got: {output}"
);
Ok(())
}
#[test]
fn test_no_folding_for_different_lines() -> Result<()> {
let config = Config::default();
let mut folder = PatternFolder::new(config);
let line1 = "2025-01-20 10:15:01 Starting application";
let line2 = "2025-01-20 10:15:02 Loading configuration";
let line3 = "2025-01-20 10:15:03 Database connected";
folder.process_line(line1)?;
folder.process_line(line2)?;
folder.process_line(line3)?;
let results = folder.finish()?;
let output = results.join("\n");
assert!(!output.contains("collapsed"));
assert!(output.contains("Starting application"));
assert!(output.contains("Loading configuration"));
assert!(output.contains("Database connected"));
Ok(())
}
}