timber_rs/
analyzer.rs

1use lazy_static::lazy_static;
2use memmap2::Mmap;
3use rayon::prelude::*;
4use regex::Regex;
5use rustc_hash::{FxHashMap, FxHashSet};
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8
9// Constants
10const CHUNK_SIZE: usize = 1_048_576; // 1MB
11const MAX_UNIQUE_LINES: usize = 10000; // Maximum unique lines to store
12
13// Pre-compiled common regex patterns
14lazy_static! {
15    static ref LEVEL_REGEX: Regex = Regex::new(
16        r"\[((?i)ERROR|WARN|INFO|DEBUG|TRACE|SEVERE|WARNING|FINE)]|(?i:ERROR|WARN|INFO|DEBUG|TRACE|SEVERE|WARNING|FINE):"
17    ).unwrap();
18
19    static ref TIMESTAMP_REGEX: Regex = Regex::new(
20        r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})"
21    ).unwrap();
22
23    static ref ERROR_TYPE_REGEX: Regex = Regex::new(
24        r"([A-Za-z]+Exception|[A-Za-z]+Error|[A-Za-z]+\s+timeout|Connection timeout|500 Internal Server Error|401 Unauthorized|503 Service Unavailable)"
25    ).unwrap();
26}
27
28#[derive(Debug, Serialize, Deserialize, Default)]
29pub struct AnalysisResult {
30    pub matched_lines: Vec<String>,
31    pub line_counts: FxHashMap<String, usize>,
32    pub count: usize,
33    pub time_trends: FxHashMap<String, usize>,
34    pub levels_count: FxHashMap<String, usize>,
35    pub error_types: FxHashMap<String, usize>,
36    pub unique_messages: FxHashSet<String>,
37    pub deduplicated: bool,
38}
39
40// Pattern matcher trait for polymorphism
41pub trait PatternMatcher: Send + Sync {
42    fn is_match(&self, text: &str) -> bool;
43}
44
45// Fast literal matching
46pub struct LiteralMatcher {
47    pattern: String,
48}
49
50impl LiteralMatcher {
51    pub fn new(pattern: &str) -> Self {
52        Self {
53            pattern: pattern.to_string(),
54        }
55    }
56}
57
58impl PatternMatcher for LiteralMatcher {
59    fn is_match(&self, text: &str) -> bool {
60        // Standard string contains method
61        text.contains(&self.pattern)
62    }
63}
64
65// Regex-based matching for complex patterns
66pub struct RegexMatcher {
67    regex: Regex,
68}
69
70impl RegexMatcher {
71    pub fn new(pattern: &str) -> Self {
72        Self {
73            regex: Regex::new(pattern).unwrap(),
74        }
75    }
76}
77
78impl PatternMatcher for RegexMatcher {
79    fn is_match(&self, text: &str) -> bool {
80        self.regex.is_match(text)
81    }
82}
83
84// Structure to hold parsed data from a line
85struct ParsedLine<'a> {
86    level: &'a str,
87    timestamp: Option<&'a str>,
88    error_type: Option<String>,
89    message: Option<&'a str>,
90}
91
92pub struct LogAnalyzer {
93    pub(crate) pattern_matcher: Option<Box<dyn PatternMatcher + Send + Sync>>,
94    pub(crate) level_filter_lowercase: Option<String>,
95}
96
97impl Default for LogAnalyzer {
98    fn default() -> Self {
99        Self::new()
100    }
101}
102
103impl LogAnalyzer {
104    pub fn new() -> Self {
105        LogAnalyzer {
106            pattern_matcher: None,
107            level_filter_lowercase: None,
108        }
109    }
110
111    // Configure analyzer with patterns
112    pub fn configure(&mut self, pattern: Option<&str>, level_filter: Option<&str>) {
113        // Create appropriate pattern matcher
114        self.pattern_matcher = pattern.map(|p| {
115            if Self::is_complex_pattern(p) {
116                Box::new(RegexMatcher::new(p)) as Box<dyn PatternMatcher + Send + Sync>
117            } else {
118                Box::new(LiteralMatcher::new(p)) as Box<dyn PatternMatcher + Send + Sync>
119            }
120        });
121
122        // Store level filter in lowercase for fast comparison
123        self.level_filter_lowercase = level_filter.map(|l| l.to_lowercase());
124    }
125
126    // New method: Configure using the optimized SIMD factory
127    pub fn configure_optimized(&mut self, pattern: Option<&str>, level_filter: Option<&str>) {
128        // Use pattern matcher factory to create the most optimized matcher
129        self.pattern_matcher = pattern.map(crate::accelerated::PatternMatcherFactory::create);
130
131        // Store level filter in lowercase for fast comparison
132        self.level_filter_lowercase = level_filter.map(|l| l.to_lowercase());
133    }
134
135    // Check if pattern is complex and needs regex
136    fn is_complex_pattern(pattern: &str) -> bool {
137        pattern.contains(|c: char| {
138            c == '*'
139                || c == '?'
140                || c == '['
141                || c == '('
142                || c == '|'
143                || c == '+'
144                || c == '.'
145                || c == '^'
146                || c == '$'
147        })
148    }
149
150    // Fast pre-check for level filter before regex
151    fn quick_level_match(&self, line: &str) -> bool {
152        if self.level_filter_lowercase.is_none() {
153            return true;
154        }
155
156        // Get lowercase filter once
157        let filter = self.level_filter_lowercase.as_deref().unwrap();
158
159        // Fast check based on level type
160        match filter {
161            "error" => line.contains("ERROR") || line.contains("error"),
162            "warn" => line.contains("WARN") || line.contains("warn") || line.contains("WARNING"),
163            "info" => line.contains("INFO") || line.contains("info"),
164            "debug" => line.contains("DEBUG") || line.contains("debug"),
165            "trace" => line.contains("TRACE") || line.contains("trace"),
166            _ => true, // For custom levels, we'll need regex
167        }
168    }
169
170    // Parse line once to extract all needed data (legacy method)
171    fn parse_line<'a>(
172        &self,
173        line: &'a str,
174        need_timestamp: bool,
175        need_stats: bool,
176    ) -> ParsedLine<'a> {
177        // Initialize with defaults
178        let mut parsed = ParsedLine {
179            level: "",
180            timestamp: None,
181            error_type: None,
182            message: None,
183        };
184
185        // Extract log level if present
186        if let Some(caps) = LEVEL_REGEX.captures(line) {
187            parsed.level = caps
188                .get(1)
189                .map_or_else(|| caps.get(0).map_or("", |m| m.as_str()), |m| m.as_str());
190        }
191
192        // Extract timestamp only if needed
193        if need_timestamp {
194            if let Some(caps) = TIMESTAMP_REGEX.captures(line) {
195                if let Some(timestamp) = caps.get(1) {
196                    let timestamp_str = timestamp.as_str();
197                    if timestamp_str.len() >= 13 {
198                        parsed.timestamp = Some(&timestamp_str[0..13]);
199                    } else {
200                        parsed.timestamp = Some(timestamp_str);
201                    }
202                }
203            }
204        }
205
206        // Extract message and error type only if collecting stats
207        if need_stats {
208            parsed.message = self.extract_message(line);
209
210            // Error type extraction - still needs a String due to formatting in some cases
211            if let Some(caps) = ERROR_TYPE_REGEX.captures(line) {
212                if let Some(error_type) = caps.get(1) {
213                    parsed.error_type = Some(error_type.as_str().to_string());
214                }
215            } else if line.contains("Failed to") {
216                // Extract the specific failure reason
217                let parts: Vec<&str> = line.split("Failed to ").collect();
218                if parts.len() > 1 {
219                    let action_parts: Vec<&str> = parts[1].split(':').collect();
220                    if !action_parts.is_empty() {
221                        let action = action_parts[0].trim();
222                        parsed.error_type = Some(format!("Failed to {}", action));
223                    }
224                }
225            }
226        }
227
228        parsed
229    }
230
231    // Extract message with string slices instead of new Strings
232    fn extract_message<'a>(&self, line: &'a str) -> Option<&'a str> {
233        let parts: Vec<&str> = line.splitn(3, " - ").collect();
234        if parts.len() >= 3 {
235            Some(parts[2])
236        } else if parts.len() == 2 {
237            Some(parts[1])
238        } else {
239            Some(line)
240        }
241    }
242
243    // For API compatibility - analyze a single line
244    pub fn analyze_line(
245        &self,
246        line: &str,
247        pattern: Option<&Regex>,
248        level_filter: Option<&str>,
249        collect_trends: bool,
250        collect_stats: bool,
251    ) -> Option<(String, String, Option<String>)> {
252        // Parse line once to extract all needed data
253        let parsed = self.parse_line(line, collect_trends, collect_stats);
254
255        // Apply filters
256        let level_matches = match level_filter {
257            None => true,
258            Some(filter_level) => {
259                !parsed.level.is_empty()
260                    && parsed.level.to_uppercase() == filter_level.to_uppercase()
261            }
262        };
263
264        let pattern_matches = match pattern {
265            None => true,
266            Some(re) => re.is_match(line),
267        };
268
269        if level_matches && pattern_matches {
270            // Format timestamp for return value
271            let timestamp = parsed.timestamp.map(String::from);
272
273            return Some((line.to_string(), parsed.level.to_uppercase(), timestamp));
274        }
275
276        None
277    }
278
279    pub fn extract_error_type(&self, line: &str) -> Option<String> {
280        let parsed = self.parse_line(line, false, true);
281        parsed.error_type
282    }
283
284    // Optimized line processing for chunks
285    pub fn process_chunk_data(
286        &self,
287        data: &[u8],
288        result: &mut AnalysisResult,
289        collect_trends: bool,
290        collect_stats: bool,
291    ) {
292        // Split data into lines
293        for line in data.split(|&b| b == b'\n').filter(|l| !l.is_empty()) {
294            // Convert line to string, skip if invalid UTF-8
295            let line_str = match std::str::from_utf8(line) {
296                Ok(s) => s,
297                Err(_) => continue,
298            };
299
300            // Fast pre-check for pattern match
301            if let Some(matcher) = &self.pattern_matcher {
302                if !matcher.is_match(line_str) {
303                    continue;
304                }
305            }
306
307            // Fast pre-check for level filter
308            if !self.quick_level_match(line_str) {
309                continue;
310            }
311
312            // Apply full regex for level filtering if needed
313            let level = if self.level_filter_lowercase.is_some() || collect_stats {
314                if let Some(caps) = LEVEL_REGEX.captures(line_str) {
315                    caps.get(1)
316                        .map_or_else(|| caps.get(0).map_or("", |m| m.as_str()), |m| m.as_str())
317                } else {
318                    ""
319                }
320            } else {
321                ""
322            };
323
324            // Skip if level doesn't match filter
325            if let Some(filter) = &self.level_filter_lowercase {
326                if level.to_lowercase() != *filter {
327                    continue;
328                }
329            }
330
331            // We have a match - increment count
332            result.count += 1;
333
334            // Store the line with deduplication
335            let line_string = line_str.to_string();
336            let entry = result.line_counts.entry(line_string.clone()).or_insert(0);
337            *entry += 1;
338
339            // Add to matched_lines if this is the first occurrence and we're within limits
340            let is_first_occurrence = *entry == 1;
341            let within_limit = result.matched_lines.len() < MAX_UNIQUE_LINES;
342
343            if is_first_occurrence && within_limit {
344                result.matched_lines.push(line_string);
345            }
346
347            // Extract additional information only if needed
348            if collect_stats || collect_trends {
349                // Extract timestamp for trends
350                let timestamp = if collect_trends {
351                    TIMESTAMP_REGEX.captures(line_str).and_then(|caps| {
352                        caps.get(1).map(|m| {
353                            let ts = m.as_str();
354                            if ts.len() >= 13 { &ts[0..13] } else { ts }
355                        })
356                    })
357                } else {
358                    None
359                };
360
361                // Add stats if requested
362                if collect_stats {
363                    // Add level count if we have a level
364                    if !level.is_empty() {
365                        let level_upper = level.to_uppercase();
366                        *result.levels_count.entry(level_upper).or_insert(0) += 1;
367                    }
368
369                    // Extract error type for stats
370                    let error_type = if let Some(caps) = ERROR_TYPE_REGEX.captures(line_str) {
371                        caps.get(1).map(|m| m.as_str().to_string())
372                    } else if line_str.contains("Failed to") {
373                        // Extract specific failure reason
374                        let parts: Vec<&str> = line_str.split("Failed to ").collect();
375                        if parts.len() > 1 {
376                            let action_parts: Vec<&str> = parts[1].split(':').collect();
377                            if !action_parts.is_empty() {
378                                let action = action_parts[0].trim();
379                                Some(format!("Failed to {}", action))
380                            } else {
381                                None
382                            }
383                        } else {
384                            None
385                        }
386                    } else {
387                        None
388                    };
389
390                    // Add error type to stats
391                    if let Some(error) = error_type {
392                        *result.error_types.entry(error).or_insert(0) += 1;
393                    }
394
395                    // Extract message for unique messages
396                    let message = {
397                        let parts: Vec<&str> = line_str.splitn(3, " - ").collect();
398                        if parts.len() >= 3 {
399                            parts[2]
400                        } else if parts.len() == 2 {
401                            parts[1]
402                        } else {
403                            line_str
404                        }
405                    };
406
407                    result.unique_messages.insert(message.to_string());
408                }
409
410                // Add time trend if requested and timestamp found
411                if collect_trends {
412                    if let Some(ts) = timestamp {
413                        *result.time_trends.entry(ts.to_string()).or_insert(0) += 1;
414                    }
415                }
416            }
417        }
418    }
419
420    // Original method for iterative processing (legacy support)
421    pub fn analyze_lines<I>(
422        &mut self,
423        lines: I,
424        pattern: Option<&Regex>,
425        level_filter: Option<&str>,
426        collect_trends: bool,
427        collect_stats: bool,
428    ) -> AnalysisResult
429    where
430        I: Iterator<Item = String>,
431    {
432        // Configure with pattern if provided
433        if let Some(pat) = pattern {
434            self.configure(Some(&pat.to_string()), level_filter);
435        } else {
436            self.configure(None, level_filter);
437        }
438
439        // Initialize result
440        let mut result = AnalysisResult {
441            matched_lines: Vec::with_capacity(1000),
442            line_counts: FxHashMap::default(),
443            count: 0,
444            time_trends: FxHashMap::default(),
445            levels_count: FxHashMap::default(),
446            error_types: FxHashMap::default(),
447            unique_messages: FxHashSet::default(),
448            deduplicated: true,
449        };
450
451        // Process all lines
452        let lines_vec: Vec<String> = lines.collect();
453        let lines_bytes: Vec<u8> = lines_vec.join("\n").into_bytes();
454
455        // Process the data as a single chunk
456        self.process_chunk_data(&lines_bytes, &mut result, collect_trends, collect_stats);
457
458        result
459    }
460
461    // New SIMD-optimized version of analyze_lines
462    pub fn analyze_lines_optimized<I>(
463        &mut self,
464        lines: I,
465        pattern: Option<&str>,
466        level_filter: Option<&str>,
467        collect_trends: bool,
468        collect_stats: bool,
469    ) -> AnalysisResult
470    where
471        I: Iterator<Item = String>,
472    {
473        // Configure with optimized pattern matcher if provided
474        if let Some(pat) = pattern {
475            self.configure_optimized(Some(pat), level_filter);
476        } else if level_filter.is_some() {
477            // If only level filter is provided, use standard configuration
478            self.configure(None, level_filter);
479        }
480
481        // Initialize result
482        let mut result = AnalysisResult {
483            matched_lines: Vec::with_capacity(1000),
484            line_counts: FxHashMap::default(),
485            count: 0,
486            time_trends: FxHashMap::default(),
487            levels_count: FxHashMap::default(),
488            error_types: FxHashMap::default(),
489            unique_messages: FxHashSet::default(),
490            deduplicated: true,
491        };
492
493        // Process all lines using SIMD-optimized line joining
494        let lines_vec: Vec<String> = lines.collect();
495        let lines_bytes: Vec<u8> = lines_vec.join("\n").into_bytes();
496
497        // Process the data as a single chunk
498        self.process_chunk_data(&lines_bytes, &mut result, collect_trends, collect_stats);
499
500        result
501    }
502
503    // Parallel processing for collections of lines (legacy support)
504    pub fn analyze_lines_parallel(
505        &mut self,
506        lines: Vec<String>,
507        pattern: Option<&Regex>,
508        level_filter: Option<&str>,
509        collect_trends: bool,
510        collect_stats: bool,
511    ) -> AnalysisResult {
512        // Configure with pattern if provided
513        if let Some(pat) = pattern {
514            self.configure(Some(&pat.to_string()), level_filter);
515        } else {
516            self.configure(None, level_filter);
517        }
518
519        // Create thread-safe shared analyzer
520        let analyzer = Arc::new(self);
521
522        // Split lines into chunks for parallel processing
523        let chunk_size = 10000; // Process in chunks of 10k lines
524        let num_chunks = lines.len().div_ceil(chunk_size);
525        let chunks: Vec<_> = (0..num_chunks)
526            .map(|i| {
527                let start = i * chunk_size;
528                let end = std::cmp::min(start + chunk_size, lines.len());
529                lines[start..end].to_vec()
530            })
531            .collect();
532
533        // Process chunks in parallel
534        let results: Vec<AnalysisResult> = chunks
535            .par_iter()
536            .map(|chunk_lines| {
537                let analyzer = Arc::clone(&analyzer);
538                let mut result = AnalysisResult {
539                    deduplicated: true,
540                    ..Default::default()
541                };
542
543                // Join lines and process as bytes
544                let lines_bytes: Vec<u8> = chunk_lines.join("\n").into_bytes();
545                analyzer.process_chunk_data(
546                    &lines_bytes,
547                    &mut result,
548                    collect_trends,
549                    collect_stats,
550                );
551
552                result
553            })
554            .collect();
555
556        // Merge results
557        let mut final_result = AnalysisResult {
558            deduplicated: true,
559            ..Default::default()
560        };
561
562        for result in results {
563            final_result.count += result.count;
564
565            // Merge line counts for deduplication
566            for (line, count) in result.line_counts {
567                let current_count = final_result.line_counts.entry(line.clone()).or_insert(0);
568                *current_count += count;
569
570                // Only add to matched_lines if we haven't exceeded our limit
571                if final_result.matched_lines.len() < MAX_UNIQUE_LINES
572                    && !final_result.matched_lines.contains(&line)
573                {
574                    final_result.matched_lines.push(line);
575                }
576            }
577
578            // Merge time trends
579            for (timestamp, count) in result.time_trends {
580                *final_result.time_trends.entry(timestamp).or_insert(0) += count;
581            }
582
583            // Merge level counts
584            for (level, count) in result.levels_count {
585                *final_result.levels_count.entry(level).or_insert(0) += count;
586            }
587
588            // Merge error types
589            for (error_type, count) in result.error_types {
590                *final_result.error_types.entry(error_type).or_insert(0) += count;
591            }
592
593            // Merge unique messages
594            final_result.unique_messages.extend(result.unique_messages);
595        }
596
597        final_result
598    }
599
600    // Parallel processing with SIMD optimizations
601    pub fn analyze_lines_parallel_optimized(
602        &mut self,
603        lines: Vec<String>,
604        pattern: Option<&str>,
605        level_filter: Option<&str>,
606        collect_trends: bool,
607        collect_stats: bool,
608    ) -> AnalysisResult {
609        // Configure with optimized pattern matcher if provided
610        if let Some(pat) = pattern {
611            self.configure_optimized(Some(pat), level_filter);
612        } else if level_filter.is_some() {
613            // If only level filter is provided, use standard configuration
614            self.configure(None, level_filter);
615        }
616
617        // Create thread-safe shared analyzer
618        let analyzer = Arc::new(self);
619
620        // Split lines into chunks for parallel processing - larger chunks for SIMD efficiency
621        let chunk_size = 20000; // Process in larger chunks for SIMD
622        let num_chunks = lines.len().div_ceil(chunk_size);
623        let chunks: Vec<_> = (0..num_chunks)
624            .map(|i| {
625                let start = i * chunk_size;
626                let end = std::cmp::min(start + chunk_size, lines.len());
627                lines[start..end].to_vec()
628            })
629            .collect();
630
631        // Process chunks in parallel with SIMD
632        let results: Vec<AnalysisResult> = chunks
633            .par_iter()
634            .map(|chunk_lines| {
635                let analyzer = Arc::clone(&analyzer);
636                let mut result = AnalysisResult {
637                    deduplicated: true,
638                    ..Default::default()
639                };
640
641                // Join lines and process as bytes
642                let lines_bytes: Vec<u8> = chunk_lines.join("\n").into_bytes();
643                analyzer.process_chunk_data(
644                    &lines_bytes,
645                    &mut result,
646                    collect_trends,
647                    collect_stats,
648                );
649
650                result
651            })
652            .collect();
653
654        // Merge results
655        let mut final_result = AnalysisResult {
656            deduplicated: true,
657            ..Default::default()
658        };
659
660        for result in results {
661            final_result.count += result.count;
662
663            // Merge line counts for deduplication
664            for (line, count) in result.line_counts {
665                let current_count = final_result.line_counts.entry(line.clone()).or_insert(0);
666                *current_count += count;
667
668                // Only add to matched_lines if we haven't exceeded our limit
669                if final_result.matched_lines.len() < MAX_UNIQUE_LINES
670                    && !final_result.matched_lines.contains(&line)
671                {
672                    final_result.matched_lines.push(line);
673                }
674            }
675
676            // Merge time trends
677            for (timestamp, count) in result.time_trends {
678                *final_result.time_trends.entry(timestamp).or_insert(0) += count;
679            }
680
681            // Merge level counts
682            for (level, count) in result.levels_count {
683                *final_result.levels_count.entry(level).or_insert(0) += count;
684            }
685
686            // Merge error types
687            for (error_type, count) in result.error_types {
688                *final_result.error_types.entry(error_type).or_insert(0) += count;
689            }
690
691            // Merge unique messages
692            final_result.unique_messages.extend(result.unique_messages);
693        }
694
695        final_result
696    }
697
698    // Memory-mapped file processing (sequential)
699    pub fn analyze_mmap(
700        &mut self,
701        mmap: &Mmap,
702        pattern: Option<&Regex>,
703        level_filter: Option<&str>,
704        collect_trends: bool,
705        collect_stats: bool,
706    ) -> AnalysisResult {
707        // Configure with pattern if provided
708        if let Some(pat) = pattern {
709            self.configure(Some(&pat.to_string()), level_filter);
710        } else {
711            self.configure(None, level_filter);
712        }
713
714        // Initialize result
715        let mut result = AnalysisResult {
716            matched_lines: Vec::with_capacity(1000),
717            line_counts: FxHashMap::default(),
718            count: 0,
719            time_trends: FxHashMap::default(),
720            levels_count: FxHashMap::default(),
721            error_types: FxHashMap::default(),
722            unique_messages: FxHashSet::default(),
723            deduplicated: true,
724        };
725
726        // Buffer for handling partial lines between chunks
727        let mut pending_line = Vec::with_capacity(4096);
728
729        // Process file in chunks
730        let mut position = 0;
731        while position < mmap.len() {
732            // Determine chunk boundaries
733            let chunk_end = std::cmp::min(position + CHUNK_SIZE, mmap.len());
734            let chunk = &mmap[position..chunk_end];
735
736            // Find the last complete line in this chunk
737            let last_newline = if chunk_end < mmap.len() {
738                match chunk.iter().rposition(|&b| b == b'\n') {
739                    Some(pos) => pos + 1, // Include the newline
740                    None => 0,            // No newline found in chunk
741                }
742            } else {
743                chunk.len() // Last chunk, process everything
744            };
745
746            // Prepare data to process (pending line + complete lines)
747            let mut process_data = Vec::with_capacity(pending_line.len() + last_newline);
748            process_data.extend_from_slice(&pending_line);
749            process_data.extend_from_slice(&chunk[..last_newline]);
750
751            // Save incomplete line for next chunk
752            pending_line.clear();
753            if last_newline < chunk.len() {
754                pending_line.extend_from_slice(&chunk[last_newline..]);
755            }
756
757            // Process the lines in this chunk
758            self.process_chunk_data(&process_data, &mut result, collect_trends, collect_stats);
759
760            // Move to next chunk
761            position += last_newline;
762        }
763
764        // Process any remaining data
765        if !pending_line.is_empty() {
766            self.process_chunk_data(&pending_line, &mut result, collect_trends, collect_stats);
767        }
768
769        result
770    }
771
772    // SIMD-optimized memory-mapped file processing
773    pub fn analyze_mmap_optimized(
774        &mut self,
775        mmap: &Mmap,
776        pattern: Option<&str>,
777        level_filter: Option<&str>,
778        collect_trends: bool,
779        collect_stats: bool,
780    ) -> AnalysisResult {
781        // Configure with pattern if provided - using SIMD optimized version
782        if let Some(pat) = pattern {
783            self.configure_optimized(Some(pat), level_filter);
784        } else if level_filter.is_some() {
785            // If only level filter, use standard configuration
786            self.configure(None, level_filter);
787        }
788
789        // Initialize result structure
790        let mut result = AnalysisResult {
791            matched_lines: Vec::with_capacity(1000),
792            line_counts: FxHashMap::default(),
793            count: 0,
794            time_trends: FxHashMap::default(),
795            levels_count: FxHashMap::default(),
796            error_types: FxHashMap::default(),
797            unique_messages: FxHashSet::default(),
798            deduplicated: true,
799        };
800
801        // Buffer for handling partial lines between chunks - larger buffer for SIMD efficiency
802        let mut pending_line = Vec::with_capacity(8192);
803
804        // Process file in chunks - use larger chunk size for SIMD efficiency
805        const SIMD_CHUNK_SIZE: usize = 4 * 1024 * 1024; // 4MB
806        let mut position = 0;
807
808        while position < mmap.len() {
809            // Determine chunk boundaries
810            let chunk_end = std::cmp::min(position + SIMD_CHUNK_SIZE, mmap.len());
811            let chunk = &mmap[position..chunk_end];
812
813            // Use memchr for fast newline search (SIMD-accelerated)
814            let last_newline = if chunk_end < mmap.len() {
815                match memchr::memchr_iter(b'\n', chunk).last() {
816                    Some(pos) => pos + 1, // Include the newline
817                    None => 0,            // No newline found in chunk
818                }
819            } else {
820                chunk.len() // Last chunk, process everything
821            };
822
823            // Prepare data to process (pending line + complete lines)
824            let mut process_data = Vec::with_capacity(pending_line.len() + last_newline);
825            process_data.extend_from_slice(&pending_line);
826            process_data.extend_from_slice(&chunk[..last_newline]);
827
828            // Save incomplete line for next chunk
829            pending_line.clear();
830            if last_newline < chunk.len() {
831                pending_line.extend_from_slice(&chunk[last_newline..]);
832            }
833
834            // Process the lines in this chunk
835            self.process_chunk_data(&process_data, &mut result, collect_trends, collect_stats);
836
837            // Move to next chunk
838            position += last_newline;
839        }
840
841        // Process any remaining data
842        if !pending_line.is_empty() {
843            self.process_chunk_data(&pending_line, &mut result, collect_trends, collect_stats);
844        }
845
846        result
847    }
848
849    // Memory-mapped file processing (parallel)
850    pub fn analyze_mmap_parallel(
851        &mut self,
852        mmap: &Mmap,
853        pattern: Option<&Regex>,
854        level_filter: Option<&str>,
855        collect_trends: bool,
856        collect_stats: bool,
857    ) -> AnalysisResult {
858        // Configure with pattern if provided
859        if let Some(pat) = pattern {
860            self.configure(Some(&pat.to_string()), level_filter);
861        } else {
862            self.configure(None, level_filter);
863        }
864
865        // Create thread-safe shared analyzer
866        let analyzer = Arc::new(self);
867
868        // Split the file into chunks for parallel processing
869        let mut chunks = Vec::new();
870        let mut chunk_start = 0;
871
872        // Identify chunk boundaries at newlines
873        while chunk_start < mmap.len() {
874            let chunk_end_approx = std::cmp::min(chunk_start + CHUNK_SIZE, mmap.len());
875
876            // Find the next newline after the approximate chunk end
877            let chunk_end = if chunk_end_approx < mmap.len() {
878                let search_end = std::cmp::min(chunk_end_approx + 1000, mmap.len());
879                match mmap[chunk_end_approx..search_end]
880                    .iter()
881                    .position(|&b| b == b'\n')
882                {
883                    Some(pos) => chunk_end_approx + pos + 1, // Include the newline
884                    None => chunk_end_approx, // No newline found, use approximate end
885                }
886            } else {
887                mmap.len() // Last chunk goes to the end
888            };
889
890            // Add chunk to list
891            chunks.push((chunk_start, chunk_end));
892            chunk_start = chunk_end;
893        }
894
895        // Process chunks in parallel
896        let results: Vec<AnalysisResult> = chunks
897            .par_iter()
898            .map(|&(start, end)| {
899                let analyzer = Arc::clone(&analyzer);
900                let chunk = &mmap[start..end];
901                let mut result = AnalysisResult {
902                    deduplicated: true,
903                    ..Default::default()
904                };
905                analyzer.process_chunk_data(chunk, &mut result, collect_trends, collect_stats);
906                result
907            })
908            .collect();
909
910        // Merge results
911        let mut final_result = AnalysisResult {
912            deduplicated: true,
913            ..Default::default()
914        };
915
916        for result in results {
917            final_result.count += result.count;
918
919            // Merge line counts for deduplication
920            for (line, count) in result.line_counts {
921                let current_count = final_result.line_counts.entry(line.clone()).or_insert(0);
922                *current_count += count;
923
924                // Only add to matched_lines if we haven't exceeded our limit
925                if final_result.matched_lines.len() < MAX_UNIQUE_LINES
926                    && !final_result.matched_lines.contains(&line)
927                {
928                    final_result.matched_lines.push(line);
929                }
930            }
931
932            // Merge time trends
933            for (timestamp, count) in result.time_trends {
934                *final_result.time_trends.entry(timestamp).or_insert(0) += count;
935            }
936
937            // Merge level counts
938            for (level, count) in result.levels_count {
939                *final_result.levels_count.entry(level).or_insert(0) += count;
940            }
941
942            // Merge error types
943            for (error_type, count) in result.error_types {
944                *final_result.error_types.entry(error_type).or_insert(0) += count;
945            }
946
947            // Merge unique messages
948            final_result.unique_messages.extend(result.unique_messages);
949        }
950
951        final_result
952    }
953
954    // SIMD-optimized parallel processing for memory-mapped files
955    pub fn analyze_mmap_parallel_optimized(
956        &mut self,
957        mmap: &Mmap,
958        pattern: Option<&str>,
959        level_filter: Option<&str>,
960        collect_trends: bool,
961        collect_stats: bool,
962    ) -> AnalysisResult {
963        // Configure with optimized pattern matcher
964        if let Some(pat) = pattern {
965            self.configure_optimized(Some(pat), level_filter);
966        } else if level_filter.is_some() {
967            // If only level filter, use standard configuration
968            self.configure(None, level_filter);
969        }
970
971        // Create thread-safe shared analyzer
972        let analyzer = Arc::new(self);
973
974        // Split the file into chunks for parallel processing
975        // Use SIMD to efficiently find newlines for chunk boundaries
976        let mut chunks = Vec::new();
977        let mut chunk_start = 0;
978        const SIMD_CHUNK_SIZE: usize = 8 * 1024 * 1024; // 8MB for better SIMD efficiency
979
980        // Identify chunk boundaries at newlines
981        while chunk_start < mmap.len() {
982            let chunk_end_approx = std::cmp::min(chunk_start + SIMD_CHUNK_SIZE, mmap.len());
983
984            // Find the next newline after the approximate chunk end
985            let chunk_end = if chunk_end_approx < mmap.len() {
986                let search_end = std::cmp::min(chunk_end_approx + 2000, mmap.len());
987                match memchr::memchr(b'\n', &mmap[chunk_end_approx..search_end]) {
988                    Some(pos) => chunk_end_approx + pos + 1, // Include the newline
989                    None => chunk_end_approx, // No newline found, use approximate end
990                }
991            } else {
992                mmap.len() // Last chunk goes to the end
993            };
994
995            // Add chunk to list
996            chunks.push((chunk_start, chunk_end));
997            chunk_start = chunk_end;
998        }
999
1000        // Process chunks in parallel
1001        let results: Vec<AnalysisResult> = chunks
1002            .par_iter()
1003            .map(|&(start, end)| {
1004                let analyzer = Arc::clone(&analyzer);
1005                let chunk = &mmap[start..end];
1006                let mut result = AnalysisResult {
1007                    deduplicated: true,
1008                    ..Default::default()
1009                };
1010                analyzer.process_chunk_data(chunk, &mut result, collect_trends, collect_stats);
1011                result
1012            })
1013            .collect();
1014
1015        // Merge results
1016        let mut final_result = AnalysisResult {
1017            deduplicated: true,
1018            ..Default::default()
1019        };
1020
1021        for result in results {
1022            final_result.count += result.count;
1023
1024            // Merge line counts for deduplication
1025            for (line, count) in result.line_counts {
1026                let current_count = final_result.line_counts.entry(line.clone()).or_insert(0);
1027                *current_count += count;
1028
1029                // Only add to matched_lines if we haven't exceeded our limit
1030                if final_result.matched_lines.len() < MAX_UNIQUE_LINES
1031                    && !final_result.matched_lines.contains(&line)
1032                {
1033                    final_result.matched_lines.push(line);
1034                }
1035            }
1036
1037            // Merge time trends
1038            for (timestamp, count) in result.time_trends {
1039                *final_result.time_trends.entry(timestamp).or_insert(0) += count;
1040            }
1041
1042            // Merge level counts
1043            for (level, count) in result.levels_count {
1044                *final_result.levels_count.entry(level).or_insert(0) += count;
1045            }
1046
1047            // Merge error types
1048            for (error_type, count) in result.error_types {
1049                *final_result.error_types.entry(error_type).or_insert(0) += count;
1050            }
1051
1052            // Merge unique messages
1053            final_result.unique_messages.extend(result.unique_messages);
1054        }
1055
1056        final_result
1057    }
1058}