1use lazy_static::lazy_static;
2use memmap2::Mmap;
3use rayon::prelude::*;
4use regex::Regex;
5use rustc_hash::{FxHashMap, FxHashSet};
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8
9const CHUNK_SIZE: usize = 1_048_576; const MAX_UNIQUE_LINES: usize = 10000; lazy_static! {
15 static ref LEVEL_REGEX: Regex = Regex::new(
16 r"\[((?i)ERROR|WARN|INFO|DEBUG|TRACE|SEVERE|WARNING|FINE)]|(?i:ERROR|WARN|INFO|DEBUG|TRACE|SEVERE|WARNING|FINE):"
17 ).unwrap();
18
19 static ref TIMESTAMP_REGEX: Regex = Regex::new(
20 r"(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})"
21 ).unwrap();
22
23 static ref ERROR_TYPE_REGEX: Regex = Regex::new(
24 r"([A-Za-z]+Exception|[A-Za-z]+Error|[A-Za-z]+\s+timeout|Connection timeout|500 Internal Server Error|401 Unauthorized|503 Service Unavailable)"
25 ).unwrap();
26}
27
28#[derive(Debug, Serialize, Deserialize, Default)]
29pub struct AnalysisResult {
30 pub matched_lines: Vec<String>,
31 pub line_counts: FxHashMap<String, usize>,
32 pub count: usize,
33 pub time_trends: FxHashMap<String, usize>,
34 pub levels_count: FxHashMap<String, usize>,
35 pub error_types: FxHashMap<String, usize>,
36 pub unique_messages: FxHashSet<String>,
37 pub deduplicated: bool,
38}
39
40pub trait PatternMatcher: Send + Sync {
42 fn is_match(&self, text: &str) -> bool;
43}
44
45pub struct LiteralMatcher {
47 pattern: String,
48}
49
50impl LiteralMatcher {
51 pub fn new(pattern: &str) -> Self {
52 Self {
53 pattern: pattern.to_string(),
54 }
55 }
56}
57
58impl PatternMatcher for LiteralMatcher {
59 fn is_match(&self, text: &str) -> bool {
60 text.contains(&self.pattern)
62 }
63}
64
65pub struct RegexMatcher {
67 regex: Regex,
68}
69
70impl RegexMatcher {
71 pub fn new(pattern: &str) -> Self {
72 Self {
73 regex: Regex::new(pattern).unwrap(),
74 }
75 }
76}
77
78impl PatternMatcher for RegexMatcher {
79 fn is_match(&self, text: &str) -> bool {
80 self.regex.is_match(text)
81 }
82}
83
84struct ParsedLine<'a> {
86 level: &'a str,
87 timestamp: Option<&'a str>,
88 error_type: Option<String>,
89 message: Option<&'a str>,
90}
91
92pub struct LogAnalyzer {
93 pub(crate) pattern_matcher: Option<Box<dyn PatternMatcher + Send + Sync>>,
94 pub(crate) level_filter_lowercase: Option<String>,
95}
96
97impl Default for LogAnalyzer {
98 fn default() -> Self {
99 Self::new()
100 }
101}
102
103impl LogAnalyzer {
104 pub fn new() -> Self {
105 LogAnalyzer {
106 pattern_matcher: None,
107 level_filter_lowercase: None,
108 }
109 }
110
111 pub fn configure(&mut self, pattern: Option<&str>, level_filter: Option<&str>) {
113 self.pattern_matcher = pattern.map(|p| {
115 if Self::is_complex_pattern(p) {
116 Box::new(RegexMatcher::new(p)) as Box<dyn PatternMatcher + Send + Sync>
117 } else {
118 Box::new(LiteralMatcher::new(p)) as Box<dyn PatternMatcher + Send + Sync>
119 }
120 });
121
122 self.level_filter_lowercase = level_filter.map(|l| l.to_lowercase());
124 }
125
126 pub fn configure_optimized(&mut self, pattern: Option<&str>, level_filter: Option<&str>) {
128 self.pattern_matcher = pattern.map(crate::accelerated::PatternMatcherFactory::create);
130
131 self.level_filter_lowercase = level_filter.map(|l| l.to_lowercase());
133 }
134
135 fn is_complex_pattern(pattern: &str) -> bool {
137 pattern.contains(|c: char| {
138 c == '*'
139 || c == '?'
140 || c == '['
141 || c == '('
142 || c == '|'
143 || c == '+'
144 || c == '.'
145 || c == '^'
146 || c == '$'
147 })
148 }
149
150 fn quick_level_match(&self, line: &str) -> bool {
152 if self.level_filter_lowercase.is_none() {
153 return true;
154 }
155
156 let filter = self.level_filter_lowercase.as_deref().unwrap();
158
159 match filter {
161 "error" => line.contains("ERROR") || line.contains("error"),
162 "warn" => line.contains("WARN") || line.contains("warn") || line.contains("WARNING"),
163 "info" => line.contains("INFO") || line.contains("info"),
164 "debug" => line.contains("DEBUG") || line.contains("debug"),
165 "trace" => line.contains("TRACE") || line.contains("trace"),
166 _ => true, }
168 }
169
170 fn parse_line<'a>(
172 &self,
173 line: &'a str,
174 need_timestamp: bool,
175 need_stats: bool,
176 ) -> ParsedLine<'a> {
177 let mut parsed = ParsedLine {
179 level: "",
180 timestamp: None,
181 error_type: None,
182 message: None,
183 };
184
185 if let Some(caps) = LEVEL_REGEX.captures(line) {
187 parsed.level = caps
188 .get(1)
189 .map_or_else(|| caps.get(0).map_or("", |m| m.as_str()), |m| m.as_str());
190 }
191
192 if need_timestamp {
194 if let Some(caps) = TIMESTAMP_REGEX.captures(line) {
195 if let Some(timestamp) = caps.get(1) {
196 let timestamp_str = timestamp.as_str();
197 if timestamp_str.len() >= 13 {
198 parsed.timestamp = Some(×tamp_str[0..13]);
199 } else {
200 parsed.timestamp = Some(timestamp_str);
201 }
202 }
203 }
204 }
205
206 if need_stats {
208 parsed.message = self.extract_message(line);
209
210 if let Some(caps) = ERROR_TYPE_REGEX.captures(line) {
212 if let Some(error_type) = caps.get(1) {
213 parsed.error_type = Some(error_type.as_str().to_string());
214 }
215 } else if line.contains("Failed to") {
216 let parts: Vec<&str> = line.split("Failed to ").collect();
218 if parts.len() > 1 {
219 let action_parts: Vec<&str> = parts[1].split(':').collect();
220 if !action_parts.is_empty() {
221 let action = action_parts[0].trim();
222 parsed.error_type = Some(format!("Failed to {}", action));
223 }
224 }
225 }
226 }
227
228 parsed
229 }
230
231 fn extract_message<'a>(&self, line: &'a str) -> Option<&'a str> {
233 let parts: Vec<&str> = line.splitn(3, " - ").collect();
234 if parts.len() >= 3 {
235 Some(parts[2])
236 } else if parts.len() == 2 {
237 Some(parts[1])
238 } else {
239 Some(line)
240 }
241 }
242
243 pub fn analyze_line(
245 &self,
246 line: &str,
247 pattern: Option<&Regex>,
248 level_filter: Option<&str>,
249 collect_trends: bool,
250 collect_stats: bool,
251 ) -> Option<(String, String, Option<String>)> {
252 let parsed = self.parse_line(line, collect_trends, collect_stats);
254
255 let level_matches = match level_filter {
257 None => true,
258 Some(filter_level) => {
259 !parsed.level.is_empty()
260 && parsed.level.to_uppercase() == filter_level.to_uppercase()
261 }
262 };
263
264 let pattern_matches = match pattern {
265 None => true,
266 Some(re) => re.is_match(line),
267 };
268
269 if level_matches && pattern_matches {
270 let timestamp = parsed.timestamp.map(String::from);
272
273 return Some((line.to_string(), parsed.level.to_uppercase(), timestamp));
274 }
275
276 None
277 }
278
279 pub fn extract_error_type(&self, line: &str) -> Option<String> {
280 let parsed = self.parse_line(line, false, true);
281 parsed.error_type
282 }
283
284 pub fn process_chunk_data(
286 &self,
287 data: &[u8],
288 result: &mut AnalysisResult,
289 collect_trends: bool,
290 collect_stats: bool,
291 ) {
292 for line in data.split(|&b| b == b'\n').filter(|l| !l.is_empty()) {
294 let line_str = match std::str::from_utf8(line) {
296 Ok(s) => s,
297 Err(_) => continue,
298 };
299
300 if let Some(matcher) = &self.pattern_matcher {
302 if !matcher.is_match(line_str) {
303 continue;
304 }
305 }
306
307 if !self.quick_level_match(line_str) {
309 continue;
310 }
311
312 let level = if self.level_filter_lowercase.is_some() || collect_stats {
314 if let Some(caps) = LEVEL_REGEX.captures(line_str) {
315 caps.get(1)
316 .map_or_else(|| caps.get(0).map_or("", |m| m.as_str()), |m| m.as_str())
317 } else {
318 ""
319 }
320 } else {
321 ""
322 };
323
324 if let Some(filter) = &self.level_filter_lowercase {
326 if level.to_lowercase() != *filter {
327 continue;
328 }
329 }
330
331 result.count += 1;
333
334 let line_string = line_str.to_string();
336 let entry = result.line_counts.entry(line_string.clone()).or_insert(0);
337 *entry += 1;
338
339 let is_first_occurrence = *entry == 1;
341 let within_limit = result.matched_lines.len() < MAX_UNIQUE_LINES;
342
343 if is_first_occurrence && within_limit {
344 result.matched_lines.push(line_string);
345 }
346
347 if collect_stats || collect_trends {
349 let timestamp = if collect_trends {
351 TIMESTAMP_REGEX.captures(line_str).and_then(|caps| {
352 caps.get(1).map(|m| {
353 let ts = m.as_str();
354 if ts.len() >= 13 { &ts[0..13] } else { ts }
355 })
356 })
357 } else {
358 None
359 };
360
361 if collect_stats {
363 if !level.is_empty() {
365 let level_upper = level.to_uppercase();
366 *result.levels_count.entry(level_upper).or_insert(0) += 1;
367 }
368
369 let error_type = if let Some(caps) = ERROR_TYPE_REGEX.captures(line_str) {
371 caps.get(1).map(|m| m.as_str().to_string())
372 } else if line_str.contains("Failed to") {
373 let parts: Vec<&str> = line_str.split("Failed to ").collect();
375 if parts.len() > 1 {
376 let action_parts: Vec<&str> = parts[1].split(':').collect();
377 if !action_parts.is_empty() {
378 let action = action_parts[0].trim();
379 Some(format!("Failed to {}", action))
380 } else {
381 None
382 }
383 } else {
384 None
385 }
386 } else {
387 None
388 };
389
390 if let Some(error) = error_type {
392 *result.error_types.entry(error).or_insert(0) += 1;
393 }
394
395 let message = {
397 let parts: Vec<&str> = line_str.splitn(3, " - ").collect();
398 if parts.len() >= 3 {
399 parts[2]
400 } else if parts.len() == 2 {
401 parts[1]
402 } else {
403 line_str
404 }
405 };
406
407 result.unique_messages.insert(message.to_string());
408 }
409
410 if collect_trends {
412 if let Some(ts) = timestamp {
413 *result.time_trends.entry(ts.to_string()).or_insert(0) += 1;
414 }
415 }
416 }
417 }
418 }
419
420 pub fn analyze_lines<I>(
422 &mut self,
423 lines: I,
424 pattern: Option<&Regex>,
425 level_filter: Option<&str>,
426 collect_trends: bool,
427 collect_stats: bool,
428 ) -> AnalysisResult
429 where
430 I: Iterator<Item = String>,
431 {
432 if let Some(pat) = pattern {
434 self.configure(Some(&pat.to_string()), level_filter);
435 } else {
436 self.configure(None, level_filter);
437 }
438
439 let mut result = AnalysisResult {
441 matched_lines: Vec::with_capacity(1000),
442 line_counts: FxHashMap::default(),
443 count: 0,
444 time_trends: FxHashMap::default(),
445 levels_count: FxHashMap::default(),
446 error_types: FxHashMap::default(),
447 unique_messages: FxHashSet::default(),
448 deduplicated: true,
449 };
450
451 let lines_vec: Vec<String> = lines.collect();
453 let lines_bytes: Vec<u8> = lines_vec.join("\n").into_bytes();
454
455 self.process_chunk_data(&lines_bytes, &mut result, collect_trends, collect_stats);
457
458 result
459 }
460
461 pub fn analyze_lines_optimized<I>(
463 &mut self,
464 lines: I,
465 pattern: Option<&str>,
466 level_filter: Option<&str>,
467 collect_trends: bool,
468 collect_stats: bool,
469 ) -> AnalysisResult
470 where
471 I: Iterator<Item = String>,
472 {
473 if let Some(pat) = pattern {
475 self.configure_optimized(Some(pat), level_filter);
476 } else if level_filter.is_some() {
477 self.configure(None, level_filter);
479 }
480
481 let mut result = AnalysisResult {
483 matched_lines: Vec::with_capacity(1000),
484 line_counts: FxHashMap::default(),
485 count: 0,
486 time_trends: FxHashMap::default(),
487 levels_count: FxHashMap::default(),
488 error_types: FxHashMap::default(),
489 unique_messages: FxHashSet::default(),
490 deduplicated: true,
491 };
492
493 let lines_vec: Vec<String> = lines.collect();
495 let lines_bytes: Vec<u8> = lines_vec.join("\n").into_bytes();
496
497 self.process_chunk_data(&lines_bytes, &mut result, collect_trends, collect_stats);
499
500 result
501 }
502
503 pub fn analyze_lines_parallel(
505 &mut self,
506 lines: Vec<String>,
507 pattern: Option<&Regex>,
508 level_filter: Option<&str>,
509 collect_trends: bool,
510 collect_stats: bool,
511 ) -> AnalysisResult {
512 if let Some(pat) = pattern {
514 self.configure(Some(&pat.to_string()), level_filter);
515 } else {
516 self.configure(None, level_filter);
517 }
518
519 let analyzer = Arc::new(self);
521
522 let chunk_size = 10000; let num_chunks = lines.len().div_ceil(chunk_size);
525 let chunks: Vec<_> = (0..num_chunks)
526 .map(|i| {
527 let start = i * chunk_size;
528 let end = std::cmp::min(start + chunk_size, lines.len());
529 lines[start..end].to_vec()
530 })
531 .collect();
532
533 let results: Vec<AnalysisResult> = chunks
535 .par_iter()
536 .map(|chunk_lines| {
537 let analyzer = Arc::clone(&analyzer);
538 let mut result = AnalysisResult {
539 deduplicated: true,
540 ..Default::default()
541 };
542
543 let lines_bytes: Vec<u8> = chunk_lines.join("\n").into_bytes();
545 analyzer.process_chunk_data(
546 &lines_bytes,
547 &mut result,
548 collect_trends,
549 collect_stats,
550 );
551
552 result
553 })
554 .collect();
555
556 let mut final_result = AnalysisResult {
558 deduplicated: true,
559 ..Default::default()
560 };
561
562 for result in results {
563 final_result.count += result.count;
564
565 for (line, count) in result.line_counts {
567 let current_count = final_result.line_counts.entry(line.clone()).or_insert(0);
568 *current_count += count;
569
570 if final_result.matched_lines.len() < MAX_UNIQUE_LINES
572 && !final_result.matched_lines.contains(&line)
573 {
574 final_result.matched_lines.push(line);
575 }
576 }
577
578 for (timestamp, count) in result.time_trends {
580 *final_result.time_trends.entry(timestamp).or_insert(0) += count;
581 }
582
583 for (level, count) in result.levels_count {
585 *final_result.levels_count.entry(level).or_insert(0) += count;
586 }
587
588 for (error_type, count) in result.error_types {
590 *final_result.error_types.entry(error_type).or_insert(0) += count;
591 }
592
593 final_result.unique_messages.extend(result.unique_messages);
595 }
596
597 final_result
598 }
599
600 pub fn analyze_lines_parallel_optimized(
602 &mut self,
603 lines: Vec<String>,
604 pattern: Option<&str>,
605 level_filter: Option<&str>,
606 collect_trends: bool,
607 collect_stats: bool,
608 ) -> AnalysisResult {
609 if let Some(pat) = pattern {
611 self.configure_optimized(Some(pat), level_filter);
612 } else if level_filter.is_some() {
613 self.configure(None, level_filter);
615 }
616
617 let analyzer = Arc::new(self);
619
620 let chunk_size = 20000; let num_chunks = lines.len().div_ceil(chunk_size);
623 let chunks: Vec<_> = (0..num_chunks)
624 .map(|i| {
625 let start = i * chunk_size;
626 let end = std::cmp::min(start + chunk_size, lines.len());
627 lines[start..end].to_vec()
628 })
629 .collect();
630
631 let results: Vec<AnalysisResult> = chunks
633 .par_iter()
634 .map(|chunk_lines| {
635 let analyzer = Arc::clone(&analyzer);
636 let mut result = AnalysisResult {
637 deduplicated: true,
638 ..Default::default()
639 };
640
641 let lines_bytes: Vec<u8> = chunk_lines.join("\n").into_bytes();
643 analyzer.process_chunk_data(
644 &lines_bytes,
645 &mut result,
646 collect_trends,
647 collect_stats,
648 );
649
650 result
651 })
652 .collect();
653
654 let mut final_result = AnalysisResult {
656 deduplicated: true,
657 ..Default::default()
658 };
659
660 for result in results {
661 final_result.count += result.count;
662
663 for (line, count) in result.line_counts {
665 let current_count = final_result.line_counts.entry(line.clone()).or_insert(0);
666 *current_count += count;
667
668 if final_result.matched_lines.len() < MAX_UNIQUE_LINES
670 && !final_result.matched_lines.contains(&line)
671 {
672 final_result.matched_lines.push(line);
673 }
674 }
675
676 for (timestamp, count) in result.time_trends {
678 *final_result.time_trends.entry(timestamp).or_insert(0) += count;
679 }
680
681 for (level, count) in result.levels_count {
683 *final_result.levels_count.entry(level).or_insert(0) += count;
684 }
685
686 for (error_type, count) in result.error_types {
688 *final_result.error_types.entry(error_type).or_insert(0) += count;
689 }
690
691 final_result.unique_messages.extend(result.unique_messages);
693 }
694
695 final_result
696 }
697
698 pub fn analyze_mmap(
700 &mut self,
701 mmap: &Mmap,
702 pattern: Option<&Regex>,
703 level_filter: Option<&str>,
704 collect_trends: bool,
705 collect_stats: bool,
706 ) -> AnalysisResult {
707 if let Some(pat) = pattern {
709 self.configure(Some(&pat.to_string()), level_filter);
710 } else {
711 self.configure(None, level_filter);
712 }
713
714 let mut result = AnalysisResult {
716 matched_lines: Vec::with_capacity(1000),
717 line_counts: FxHashMap::default(),
718 count: 0,
719 time_trends: FxHashMap::default(),
720 levels_count: FxHashMap::default(),
721 error_types: FxHashMap::default(),
722 unique_messages: FxHashSet::default(),
723 deduplicated: true,
724 };
725
726 let mut pending_line = Vec::with_capacity(4096);
728
729 let mut position = 0;
731 while position < mmap.len() {
732 let chunk_end = std::cmp::min(position + CHUNK_SIZE, mmap.len());
734 let chunk = &mmap[position..chunk_end];
735
736 let last_newline = if chunk_end < mmap.len() {
738 match chunk.iter().rposition(|&b| b == b'\n') {
739 Some(pos) => pos + 1, None => 0, }
742 } else {
743 chunk.len() };
745
746 let mut process_data = Vec::with_capacity(pending_line.len() + last_newline);
748 process_data.extend_from_slice(&pending_line);
749 process_data.extend_from_slice(&chunk[..last_newline]);
750
751 pending_line.clear();
753 if last_newline < chunk.len() {
754 pending_line.extend_from_slice(&chunk[last_newline..]);
755 }
756
757 self.process_chunk_data(&process_data, &mut result, collect_trends, collect_stats);
759
760 position += last_newline;
762 }
763
764 if !pending_line.is_empty() {
766 self.process_chunk_data(&pending_line, &mut result, collect_trends, collect_stats);
767 }
768
769 result
770 }
771
772 pub fn analyze_mmap_optimized(
774 &mut self,
775 mmap: &Mmap,
776 pattern: Option<&str>,
777 level_filter: Option<&str>,
778 collect_trends: bool,
779 collect_stats: bool,
780 ) -> AnalysisResult {
781 if let Some(pat) = pattern {
783 self.configure_optimized(Some(pat), level_filter);
784 } else if level_filter.is_some() {
785 self.configure(None, level_filter);
787 }
788
789 let mut result = AnalysisResult {
791 matched_lines: Vec::with_capacity(1000),
792 line_counts: FxHashMap::default(),
793 count: 0,
794 time_trends: FxHashMap::default(),
795 levels_count: FxHashMap::default(),
796 error_types: FxHashMap::default(),
797 unique_messages: FxHashSet::default(),
798 deduplicated: true,
799 };
800
801 let mut pending_line = Vec::with_capacity(8192);
803
804 const SIMD_CHUNK_SIZE: usize = 4 * 1024 * 1024; let mut position = 0;
807
808 while position < mmap.len() {
809 let chunk_end = std::cmp::min(position + SIMD_CHUNK_SIZE, mmap.len());
811 let chunk = &mmap[position..chunk_end];
812
813 let last_newline = if chunk_end < mmap.len() {
815 match memchr::memchr_iter(b'\n', chunk).last() {
816 Some(pos) => pos + 1, None => 0, }
819 } else {
820 chunk.len() };
822
823 let mut process_data = Vec::with_capacity(pending_line.len() + last_newline);
825 process_data.extend_from_slice(&pending_line);
826 process_data.extend_from_slice(&chunk[..last_newline]);
827
828 pending_line.clear();
830 if last_newline < chunk.len() {
831 pending_line.extend_from_slice(&chunk[last_newline..]);
832 }
833
834 self.process_chunk_data(&process_data, &mut result, collect_trends, collect_stats);
836
837 position += last_newline;
839 }
840
841 if !pending_line.is_empty() {
843 self.process_chunk_data(&pending_line, &mut result, collect_trends, collect_stats);
844 }
845
846 result
847 }
848
849 pub fn analyze_mmap_parallel(
851 &mut self,
852 mmap: &Mmap,
853 pattern: Option<&Regex>,
854 level_filter: Option<&str>,
855 collect_trends: bool,
856 collect_stats: bool,
857 ) -> AnalysisResult {
858 if let Some(pat) = pattern {
860 self.configure(Some(&pat.to_string()), level_filter);
861 } else {
862 self.configure(None, level_filter);
863 }
864
865 let analyzer = Arc::new(self);
867
868 let mut chunks = Vec::new();
870 let mut chunk_start = 0;
871
872 while chunk_start < mmap.len() {
874 let chunk_end_approx = std::cmp::min(chunk_start + CHUNK_SIZE, mmap.len());
875
876 let chunk_end = if chunk_end_approx < mmap.len() {
878 let search_end = std::cmp::min(chunk_end_approx + 1000, mmap.len());
879 match mmap[chunk_end_approx..search_end]
880 .iter()
881 .position(|&b| b == b'\n')
882 {
883 Some(pos) => chunk_end_approx + pos + 1, None => chunk_end_approx, }
886 } else {
887 mmap.len() };
889
890 chunks.push((chunk_start, chunk_end));
892 chunk_start = chunk_end;
893 }
894
895 let results: Vec<AnalysisResult> = chunks
897 .par_iter()
898 .map(|&(start, end)| {
899 let analyzer = Arc::clone(&analyzer);
900 let chunk = &mmap[start..end];
901 let mut result = AnalysisResult {
902 deduplicated: true,
903 ..Default::default()
904 };
905 analyzer.process_chunk_data(chunk, &mut result, collect_trends, collect_stats);
906 result
907 })
908 .collect();
909
910 let mut final_result = AnalysisResult {
912 deduplicated: true,
913 ..Default::default()
914 };
915
916 for result in results {
917 final_result.count += result.count;
918
919 for (line, count) in result.line_counts {
921 let current_count = final_result.line_counts.entry(line.clone()).or_insert(0);
922 *current_count += count;
923
924 if final_result.matched_lines.len() < MAX_UNIQUE_LINES
926 && !final_result.matched_lines.contains(&line)
927 {
928 final_result.matched_lines.push(line);
929 }
930 }
931
932 for (timestamp, count) in result.time_trends {
934 *final_result.time_trends.entry(timestamp).or_insert(0) += count;
935 }
936
937 for (level, count) in result.levels_count {
939 *final_result.levels_count.entry(level).or_insert(0) += count;
940 }
941
942 for (error_type, count) in result.error_types {
944 *final_result.error_types.entry(error_type).or_insert(0) += count;
945 }
946
947 final_result.unique_messages.extend(result.unique_messages);
949 }
950
951 final_result
952 }
953
954 pub fn analyze_mmap_parallel_optimized(
956 &mut self,
957 mmap: &Mmap,
958 pattern: Option<&str>,
959 level_filter: Option<&str>,
960 collect_trends: bool,
961 collect_stats: bool,
962 ) -> AnalysisResult {
963 if let Some(pat) = pattern {
965 self.configure_optimized(Some(pat), level_filter);
966 } else if level_filter.is_some() {
967 self.configure(None, level_filter);
969 }
970
971 let analyzer = Arc::new(self);
973
974 let mut chunks = Vec::new();
977 let mut chunk_start = 0;
978 const SIMD_CHUNK_SIZE: usize = 8 * 1024 * 1024; while chunk_start < mmap.len() {
982 let chunk_end_approx = std::cmp::min(chunk_start + SIMD_CHUNK_SIZE, mmap.len());
983
984 let chunk_end = if chunk_end_approx < mmap.len() {
986 let search_end = std::cmp::min(chunk_end_approx + 2000, mmap.len());
987 match memchr::memchr(b'\n', &mmap[chunk_end_approx..search_end]) {
988 Some(pos) => chunk_end_approx + pos + 1, None => chunk_end_approx, }
991 } else {
992 mmap.len() };
994
995 chunks.push((chunk_start, chunk_end));
997 chunk_start = chunk_end;
998 }
999
1000 let results: Vec<AnalysisResult> = chunks
1002 .par_iter()
1003 .map(|&(start, end)| {
1004 let analyzer = Arc::clone(&analyzer);
1005 let chunk = &mmap[start..end];
1006 let mut result = AnalysisResult {
1007 deduplicated: true,
1008 ..Default::default()
1009 };
1010 analyzer.process_chunk_data(chunk, &mut result, collect_trends, collect_stats);
1011 result
1012 })
1013 .collect();
1014
1015 let mut final_result = AnalysisResult {
1017 deduplicated: true,
1018 ..Default::default()
1019 };
1020
1021 for result in results {
1022 final_result.count += result.count;
1023
1024 for (line, count) in result.line_counts {
1026 let current_count = final_result.line_counts.entry(line.clone()).or_insert(0);
1027 *current_count += count;
1028
1029 if final_result.matched_lines.len() < MAX_UNIQUE_LINES
1031 && !final_result.matched_lines.contains(&line)
1032 {
1033 final_result.matched_lines.push(line);
1034 }
1035 }
1036
1037 for (timestamp, count) in result.time_trends {
1039 *final_result.time_trends.entry(timestamp).or_insert(0) += count;
1040 }
1041
1042 for (level, count) in result.levels_count {
1044 *final_result.levels_count.entry(level).or_insert(0) += count;
1045 }
1046
1047 for (error_type, count) in result.error_types {
1049 *final_result.error_types.entry(error_type).or_insert(0) += count;
1050 }
1051
1052 final_result.unique_messages.extend(result.unique_messages);
1054 }
1055
1056 final_result
1057 }
1058}