log_analyzer/services/
log_service.rs

1use std::ops::Range;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::Result;
6use flume::Sender;
7use log_source::source::log_source::{create_source, LogSource, SourceType};
8use regex::Regex;
9use tokio::sync::broadcast;
10
11use pariter::{scope, IteratorExt as _};
12
13use crate::domain::apply_filters::apply_filters;
14use crate::domain::apply_format::apply_format;
15use crate::domain::apply_search::{apply_search, format_search};
16use crate::models::filter::LogFilter;
17use crate::models::log_line_styled::LogLineStyled;
18use crate::models::{filter::Filter, format::Format, log_line::LogLine};
19use crate::stores::analysis_store::AnalysisStore;
20use crate::stores::log_store::LogStore;
21use crate::stores::processing_store::ProcessingStore;
22
23#[derive(Debug, Clone, Eq, PartialEq)]
24/// Notify of state changes
25pub enum Event {
26    // Currently processing lines (from, to)
27    Processing(usize, usize),
28    // New lines processed (from, to)
29    NewLines(usize, usize),
30    // New search lines processed (from, to)
31    NewSearchLines(usize, usize),
32    // Currently busy filtering
33    Filtering,
34    // Finished filtering
35    FilterFinished,
36    // Finished busy searching
37    Searching,
38    // Finished search
39    SearchFinished,
40}
41
42/// Main API of this crate
43pub trait LogAnalyzer {
44    /// Add a new log source to the analysis
45    fn add_log(
46        &self,
47        source_type: usize,
48        source_address: &str,
49        format: Option<&String>,
50    ) -> Result<()>;
51    /// Add a new format to the list of available formats
52    fn add_format(&self, alias: &str, regex: &str) -> Result<()>;
53    /// Start a new search
54    fn add_search(&self, regex: &str);
55    /// Add a new filter to the list of available filters
56    fn add_filter(&self, filter: Filter);
57    /// Get log lines between the range [from, to]
58    fn get_log_lines(&self, from: usize, to: usize) -> Vec<LogLine>;
59    /// Get search lines between the range [from, to]
60    fn get_search_lines(&self, from: usize, to: usize) -> Vec<LogLineStyled>;
61    /// Get a list of log lines of `elements` size centered on the `line` element or the closest
62    /// Returns (elements, offset, index)
63    fn get_log_lines_containing(
64        &self,
65        index: usize,
66        elements: usize,
67    ) -> (Vec<LogLine>, usize, usize);
68
69    /// Get a list of log lines of `elements` size centered on the `line` element or the closest
70    /// Returns (elements, offset, index)
71    fn get_search_lines_containing(
72        &self,
73        index: usize,
74        elements: usize,
75    ) -> (Vec<LogLineStyled>, usize, usize);
76
77    /// Get the current managed logs
78    /// Returns a vector of (enabled, log_path, Option<format>)
79    fn get_logs(&self) -> Vec<(bool, String, Option<String>)>;
80
81    /// Get all the available formats
82    fn get_formats(&self) -> Vec<Format>;
83    /// Get all the available filters together with their enabled state
84    fn get_filters(&self) -> Vec<(bool, Filter)>;
85    /// Get how many lines are in the raw logs
86    fn get_total_raw_lines(&self) -> usize;
87    /// Get how many lines are in the filtered log
88    fn get_total_filtered_lines(&self) -> usize;
89    /// Get how many lines are in the search log
90    fn get_total_searched_lines(&self) -> usize;
91    /// Enable or disable the given source
92    fn toggle_source(&self, id: &str);
93    /// Enable or disable the given filter
94    fn toggle_filter(&self, id: &str);
95    fn on_event(&self) -> broadcast::Receiver<Event>;
96}
97
98pub struct LogService {
99    log_store: Arc<dyn LogStore + Sync + Send>,
100    processing_store: Arc<dyn ProcessingStore + Sync + Send>,
101    analysis_store: Arc<dyn AnalysisStore + Sync + Send>,
102    log_sender: Sender<(String, Vec<String>)>,
103    event_channel: broadcast::Sender<Event>,
104}
105
106impl LogService {
107    /// Instantiates the service and starts the consumer thread.
108    ///
109    /// The consumer thread continuously listens to lines from log sources and applies
110    /// a chain of operations
111    /// * apply format
112    /// * apply filters
113    /// * apply search
114    pub fn new(
115        log_store: Arc<dyn LogStore + Sync + Send>,
116        processing_store: Arc<dyn ProcessingStore + Sync + Send>,
117        analysis_store: Arc<dyn AnalysisStore + Sync + Send>,
118    ) -> Arc<Self> {
119        let (sender, receiver) = flume::bounded(1_000_000_usize);
120        let (broadcast_sender, _broadcast_receiver) = broadcast::channel(1_000_000_usize);
121
122        let log_service = Arc::new(Self {
123            log_store,
124            processing_store,
125            analysis_store,
126            log_sender: sender,
127            event_channel: broadcast_sender,
128        });
129
130        let log = log_service.clone();
131        let event_sender = log_service.event_channel.clone();
132        std::thread::Builder::new()
133            .name("Consumer".to_string())
134            .spawn(move || loop {
135                let num_cpus = num_cpus::get();
136                while let Ok((path, lines)) = receiver.recv() {
137                    let (format, indexes, lines) = log.process_raw_lines(&path, lines);
138
139                    if !lines.is_empty() {
140                        let chunk_size = lines.len() / num_cpus;
141
142                        let elements: Vec<(String, usize)> = lines
143                            .into_iter()
144                            .zip(indexes)
145                            .map(|(line, index)| (line, index))
146                            .collect();
147
148                        let first_index = elements[0].1;
149                        let last_index = elements.last().unwrap().1;
150                        event_sender
151                            .send(Event::Processing(first_index, last_index))
152                            .unwrap_or_default();
153
154                        scope(|scope| {
155                            // Split the lines to process in equal chunks to be processed in parallel
156                            let processed: Vec<(Vec<LogLine>, Vec<LogLine>)> = elements
157                                .chunks(chunk_size.max(num_cpus))
158                                .parallel_map_scoped(scope, |chunk| {
159                                    let lines = log.apply_format(&format, &path, chunk);
160                                    let filtered_lines = log.apply_filters(lines);
161                                    let (filtered, search) = log.apply_search(filtered_lines);
162                                    (filtered, search)
163                                })
164                                .collect();
165
166                            // Store the processed lines in the analysis store
167                            for (filtered, search) in processed {
168                                log.analysis_store.add_lines(&filtered);
169                                log.analysis_store.add_search_lines(&search);
170                            }
171
172                            // Notify of the processed lines
173                            event_sender
174                                .send(Event::NewLines(first_index, last_index))
175                                .unwrap_or_default();
176                            event_sender
177                                .send(Event::NewSearchLines(first_index, last_index))
178                                .unwrap_or_default();
179                        })
180                        .unwrap();
181                    }
182                }
183            })
184            .unwrap();
185
186        log_service
187    }
188
189    /// Store the raw received lines in memory and retrieve if there is a format for this log
190    fn process_raw_lines(
191        &self,
192        path: &str,
193        lines: Vec<String>,
194    ) -> (Option<String>, Range<usize>, Vec<String>) {
195        let indexes = self.log_store.add_lines(path, &lines);
196        let format = self.log_store.get_format(path);
197        (format, indexes, lines)
198    }
199
200    /// Apply formatting (if any) to a list of lines and return the formated `LogLine`
201    fn apply_format(
202        &self,
203        format: &Option<String>,
204        path: &str,
205        line_index: &[(String, usize)],
206    ) -> Vec<LogLine> {
207        let mut format_regex = None;
208
209        if let Some(format) = format {
210            let format = self.processing_store.get_format(format);
211            format_regex = format.map(|format| Regex::new(&format).unwrap());
212        }
213
214        let mut log_lines: Vec<LogLine> = Vec::with_capacity(line_index.len());
215        for (line, index) in line_index {
216            let log_line = apply_format(&format_regex.as_ref(), path, line, *index);
217            log_lines.push(log_line);
218        }
219        log_lines
220    }
221
222    /// Apply filters (if any) to a list of `LogLine` and return the filtered list of `LogLine`
223    fn apply_filters(&self, lines: Vec<LogLine>) -> Vec<LogLine> {
224        let filters: Vec<LogFilter> = self
225            .processing_store
226            .get_filters()
227            .into_iter()
228            .filter(|(enabled, _)| *enabled)
229            .map(|(_, filter)| filter.into())
230            .collect();
231
232        let mut filtered_lines: Vec<LogLine> = Vec::with_capacity(lines.len());
233        for line in lines {
234            if let Some(filtered_line) = apply_filters(&filters, line) {
235                filtered_lines.push(filtered_line);
236            }
237        }
238        filtered_lines
239    }
240
241    /// Apply the search query (if any) to a list of `LogLine` and return both the received lines and the searched ones
242    fn apply_search(&self, lines: Vec<LogLine>) -> (Vec<LogLine>, Vec<LogLine>) {
243        let mut search_lines: Vec<LogLine> = Vec::with_capacity(lines.len());
244        if let Some(search_query) = self.analysis_store.get_search_query() {
245            if let Ok(search_regex) = Regex::new(&search_query) {
246                for line in &lines {
247                    if apply_search(&search_regex, line) {
248                        search_lines.push(line.clone());
249                    }
250                }
251            }
252        }
253
254        (lines, search_lines)
255    }
256
257    /// Helper function to run log sources
258    fn run_log_source(&self, log_source: Arc<Box<dyn LogSource + Send + Sync>>) {
259        let sender = self.log_sender.clone();
260
261        std::thread::Builder::new()
262            .name(log_source.get_address())
263            .spawn(|| {
264                async_std::task::spawn(async move {
265                    log_source.run(sender).await.unwrap();
266                });
267            })
268            .unwrap();
269    }
270}
271
272impl LogAnalyzer for LogService {
273    fn add_log(
274        &self,
275        source_type: usize,
276        source_address: &str,
277        format: Option<&String>,
278    ) -> Result<()> {
279        let log_store = self.log_store.clone();
280
281        let source_type = SourceType::try_from(source_type).unwrap();
282
283        let log_source = Arc::new(async_std::task::block_on(create_source(
284            source_type,
285            source_address.to_string(),
286        ))?);
287        log_store.add_log(source_address, log_source.clone(), format, true);
288        self.run_log_source(log_source);
289
290        Ok(())
291    }
292
293    fn add_format(&self, alias: &str, regex: &str) -> Result<()> {
294        let format = Format::new(alias, regex)?;
295
296        self.processing_store.add_format(format.alias, format.regex);
297        Ok(())
298    }
299
300    fn add_search(&self, regex: &str) {
301        let re = Regex::new(regex);
302        self.analysis_store.reset_search();
303
304        if re.is_ok() {
305            self.analysis_store.add_search_query(regex);
306
307            let analysis_store = self.analysis_store.clone();
308            let regex_str = regex.to_string();
309            let sender = self.event_channel.clone();
310
311            std::thread::Builder::new()
312                .name("Search".to_string())
313                .spawn(move || {
314                    let log = analysis_store.fetch_log();
315
316                    if !log.is_empty() {
317                        sender.send(Event::Searching).unwrap_or_default();
318                        scope(|scope| {
319                            let num_cpus = num_cpus::get();
320                            let chunk_size = log.len() / num_cpus;
321                            let search_lines: Vec<LogLine> = log
322                                .chunks(chunk_size.max(num_cpus))
323                                .parallel_map_scoped(scope, move |chunk| {
324                                    let lines = chunk.to_owned();
325                                    let r = Regex::new(&regex_str).unwrap();
326                                    let mut v: Vec<LogLine> = Vec::with_capacity(lines.len());
327
328                                    for log_line in lines {
329                                        if apply_search(&r, &log_line) {
330                                            v.push(log_line);
331                                        };
332                                    }
333
334                                    v
335                                })
336                                .flatten()
337                                .collect::<Vec<LogLine>>();
338                            analysis_store.add_search_lines(&search_lines);
339                        })
340                        .unwrap();
341                        sender.send(Event::SearchFinished).unwrap_or_default();
342                    }
343                })
344                .unwrap();
345        }
346    }
347
348    fn add_filter(&self, filter: Filter) {
349        self.processing_store
350            .add_filter(filter.alias, filter.filter, filter.action, false);
351    }
352
353    fn get_log_lines(&self, from: usize, to: usize) -> Vec<LogLine> {
354        self.analysis_store.get_log_lines(from, to)
355    }
356
357    fn get_search_lines(&self, from: usize, to: usize) -> Vec<LogLineStyled> {
358        let search_lines_containing = self.analysis_store.get_search_lines(from, to);
359        let mut styled_search_lines = vec![];
360
361        if !search_lines_containing.is_empty() {
362            // If there are search lines we are sure that there is a valid search query
363            let query = Regex::new(&self.analysis_store.get_search_query().unwrap()).unwrap();
364            styled_search_lines = search_lines_containing
365                .into_iter()
366                .map(|l| format_search(&query, &l))
367                .collect();
368        }
369
370        styled_search_lines
371    }
372
373    fn get_log_lines_containing(
374        &self,
375        index: usize,
376        elements: usize,
377    ) -> (Vec<LogLine>, usize, usize) {
378        self.analysis_store
379            .get_log_lines_containing(index, elements)
380    }
381
382    fn get_search_lines_containing(
383        &self,
384        index: usize,
385        elements: usize,
386    ) -> (Vec<LogLineStyled>, usize, usize) {
387        let search_lines_containing = self
388            .analysis_store
389            .get_search_lines_containing(index, elements);
390
391        let mut styled_search_lines: (Vec<LogLineStyled>, usize, usize) =
392            (vec![], search_lines_containing.1, search_lines_containing.2);
393
394        if !search_lines_containing.0.is_empty() {
395            // If there are search lines we are sure that there is a valid search query
396            let query = Regex::new(&self.analysis_store.get_search_query().unwrap()).unwrap();
397            styled_search_lines.0 = search_lines_containing
398                .0
399                .into_iter()
400                .map(|l| format_search(&query, &l))
401                .collect();
402        }
403
404        styled_search_lines
405    }
406
407    fn get_logs(&self) -> Vec<(bool, String, Option<String>)> {
408        self.log_store.get_logs()
409    }
410
411    fn get_formats(&self) -> Vec<Format> {
412        self.processing_store.get_formats()
413    }
414
415    fn get_filters(&self) -> Vec<(bool, Filter)> {
416        self.processing_store.get_filters()
417    }
418
419    fn get_total_raw_lines(&self) -> usize {
420        self.log_store.get_total_lines()
421    }
422
423    fn get_total_filtered_lines(&self) -> usize {
424        self.analysis_store.get_total_filtered_lines()
425    }
426
427    fn get_total_searched_lines(&self) -> usize {
428        self.analysis_store.get_total_searched_lines()
429    }
430
431    fn toggle_source(&self, id: &str) {
432        if let Some((enabled, _log, _format)) = self
433            .log_store
434            .get_logs()
435            .into_iter()
436            .find(|(_, log_id, _)| log_id == id)
437        {
438            if let Some(source) = self.log_store.get_source(id) {
439                self.log_store.toggle_log(id);
440                // If enabled -> disable
441                if enabled {
442                    source.stop();
443                } else {
444                    self.run_log_source(source);
445                }
446            }
447        }
448    }
449
450    fn toggle_filter(&self, id: &str) {
451        self.processing_store.toggle_filter(id);
452
453        // Reset everything because we need to recompute the log from the raw lines
454        self.analysis_store.reset_log();
455        self.analysis_store.reset_search();
456
457        let mut receiver = self.event_channel.subscribe();
458
459        let enabled_logs: Vec<String> = self
460            .log_store
461            .get_logs()
462            .into_iter()
463            .filter(|(enabled, _, _)| *enabled)
464            .map(|(_, id, _)| id)
465            .collect();
466
467        let log_store = self.log_store.clone();
468        let sender = self.log_sender.clone();
469        let event_sender = self.event_channel.clone();
470
471        std::thread::Builder::new()
472            .name("Toggle filter".to_string())
473            .spawn(move || {
474                for log in enabled_logs {
475                    let lines = log_store.extract_lines(&log);
476
477                    if lines.is_empty() {
478                        event_sender.send(Event::FilterFinished).unwrap();
479                        continue;
480                    }
481
482                    event_sender.send(Event::Filtering).unwrap();
483                    sender.send((log.clone(), lines.to_vec())).unwrap();
484
485                    while !matches!(
486                        async_std::task::block_on(receiver.recv()).unwrap_or(Event::Filtering),
487                        Event::NewLines(_, last) if last == (lines.len() - 1)
488                    ) {
489                        std::thread::sleep(Duration::from_millis(100));
490                    }
491                    event_sender.send(Event::FilterFinished).unwrap();
492                }
493            })
494            .unwrap();
495    }
496
497    fn on_event(&self) -> broadcast::Receiver<Event> {
498        self.event_channel.subscribe()
499    }
500}