1use std::ops::Range;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::Result;
6use flume::Sender;
7use log_source::source::log_source::{create_source, LogSource, SourceType};
8use regex::Regex;
9use tokio::sync::broadcast;
10
11use pariter::{scope, IteratorExt as _};
12
13use crate::domain::apply_filters::apply_filters;
14use crate::domain::apply_format::apply_format;
15use crate::domain::apply_search::{apply_search, format_search};
16use crate::models::filter::LogFilter;
17use crate::models::log_line_styled::LogLineStyled;
18use crate::models::{filter::Filter, format::Format, log_line::LogLine};
19use crate::stores::analysis_store::AnalysisStore;
20use crate::stores::log_store::LogStore;
21use crate::stores::processing_store::ProcessingStore;
22
23#[derive(Debug, Clone, Eq, PartialEq)]
24pub enum Event {
26 Processing(usize, usize),
28 NewLines(usize, usize),
30 NewSearchLines(usize, usize),
32 Filtering,
34 FilterFinished,
36 Searching,
38 SearchFinished,
40}
41
42pub trait LogAnalyzer {
44 fn add_log(
46 &self,
47 source_type: usize,
48 source_address: &str,
49 format: Option<&String>,
50 ) -> Result<()>;
51 fn add_format(&self, alias: &str, regex: &str) -> Result<()>;
53 fn add_search(&self, regex: &str);
55 fn add_filter(&self, filter: Filter);
57 fn get_log_lines(&self, from: usize, to: usize) -> Vec<LogLine>;
59 fn get_search_lines(&self, from: usize, to: usize) -> Vec<LogLineStyled>;
61 fn get_log_lines_containing(
64 &self,
65 index: usize,
66 elements: usize,
67 ) -> (Vec<LogLine>, usize, usize);
68
69 fn get_search_lines_containing(
72 &self,
73 index: usize,
74 elements: usize,
75 ) -> (Vec<LogLineStyled>, usize, usize);
76
77 fn get_logs(&self) -> Vec<(bool, String, Option<String>)>;
80
81 fn get_formats(&self) -> Vec<Format>;
83 fn get_filters(&self) -> Vec<(bool, Filter)>;
85 fn get_total_raw_lines(&self) -> usize;
87 fn get_total_filtered_lines(&self) -> usize;
89 fn get_total_searched_lines(&self) -> usize;
91 fn toggle_source(&self, id: &str);
93 fn toggle_filter(&self, id: &str);
95 fn on_event(&self) -> broadcast::Receiver<Event>;
96}
97
98pub struct LogService {
99 log_store: Arc<dyn LogStore + Sync + Send>,
100 processing_store: Arc<dyn ProcessingStore + Sync + Send>,
101 analysis_store: Arc<dyn AnalysisStore + Sync + Send>,
102 log_sender: Sender<(String, Vec<String>)>,
103 event_channel: broadcast::Sender<Event>,
104}
105
106impl LogService {
107 pub fn new(
115 log_store: Arc<dyn LogStore + Sync + Send>,
116 processing_store: Arc<dyn ProcessingStore + Sync + Send>,
117 analysis_store: Arc<dyn AnalysisStore + Sync + Send>,
118 ) -> Arc<Self> {
119 let (sender, receiver) = flume::bounded(1_000_000_usize);
120 let (broadcast_sender, _broadcast_receiver) = broadcast::channel(1_000_000_usize);
121
122 let log_service = Arc::new(Self {
123 log_store,
124 processing_store,
125 analysis_store,
126 log_sender: sender,
127 event_channel: broadcast_sender,
128 });
129
130 let log = log_service.clone();
131 let event_sender = log_service.event_channel.clone();
132 std::thread::Builder::new()
133 .name("Consumer".to_string())
134 .spawn(move || loop {
135 let num_cpus = num_cpus::get();
136 while let Ok((path, lines)) = receiver.recv() {
137 let (format, indexes, lines) = log.process_raw_lines(&path, lines);
138
139 if !lines.is_empty() {
140 let chunk_size = lines.len() / num_cpus;
141
142 let elements: Vec<(String, usize)> = lines
143 .into_iter()
144 .zip(indexes)
145 .map(|(line, index)| (line, index))
146 .collect();
147
148 let first_index = elements[0].1;
149 let last_index = elements.last().unwrap().1;
150 event_sender
151 .send(Event::Processing(first_index, last_index))
152 .unwrap_or_default();
153
154 scope(|scope| {
155 let processed: Vec<(Vec<LogLine>, Vec<LogLine>)> = elements
157 .chunks(chunk_size.max(num_cpus))
158 .parallel_map_scoped(scope, |chunk| {
159 let lines = log.apply_format(&format, &path, chunk);
160 let filtered_lines = log.apply_filters(lines);
161 let (filtered, search) = log.apply_search(filtered_lines);
162 (filtered, search)
163 })
164 .collect();
165
166 for (filtered, search) in processed {
168 log.analysis_store.add_lines(&filtered);
169 log.analysis_store.add_search_lines(&search);
170 }
171
172 event_sender
174 .send(Event::NewLines(first_index, last_index))
175 .unwrap_or_default();
176 event_sender
177 .send(Event::NewSearchLines(first_index, last_index))
178 .unwrap_or_default();
179 })
180 .unwrap();
181 }
182 }
183 })
184 .unwrap();
185
186 log_service
187 }
188
189 fn process_raw_lines(
191 &self,
192 path: &str,
193 lines: Vec<String>,
194 ) -> (Option<String>, Range<usize>, Vec<String>) {
195 let indexes = self.log_store.add_lines(path, &lines);
196 let format = self.log_store.get_format(path);
197 (format, indexes, lines)
198 }
199
200 fn apply_format(
202 &self,
203 format: &Option<String>,
204 path: &str,
205 line_index: &[(String, usize)],
206 ) -> Vec<LogLine> {
207 let mut format_regex = None;
208
209 if let Some(format) = format {
210 let format = self.processing_store.get_format(format);
211 format_regex = format.map(|format| Regex::new(&format).unwrap());
212 }
213
214 let mut log_lines: Vec<LogLine> = Vec::with_capacity(line_index.len());
215 for (line, index) in line_index {
216 let log_line = apply_format(&format_regex.as_ref(), path, line, *index);
217 log_lines.push(log_line);
218 }
219 log_lines
220 }
221
222 fn apply_filters(&self, lines: Vec<LogLine>) -> Vec<LogLine> {
224 let filters: Vec<LogFilter> = self
225 .processing_store
226 .get_filters()
227 .into_iter()
228 .filter(|(enabled, _)| *enabled)
229 .map(|(_, filter)| filter.into())
230 .collect();
231
232 let mut filtered_lines: Vec<LogLine> = Vec::with_capacity(lines.len());
233 for line in lines {
234 if let Some(filtered_line) = apply_filters(&filters, line) {
235 filtered_lines.push(filtered_line);
236 }
237 }
238 filtered_lines
239 }
240
241 fn apply_search(&self, lines: Vec<LogLine>) -> (Vec<LogLine>, Vec<LogLine>) {
243 let mut search_lines: Vec<LogLine> = Vec::with_capacity(lines.len());
244 if let Some(search_query) = self.analysis_store.get_search_query() {
245 if let Ok(search_regex) = Regex::new(&search_query) {
246 for line in &lines {
247 if apply_search(&search_regex, line) {
248 search_lines.push(line.clone());
249 }
250 }
251 }
252 }
253
254 (lines, search_lines)
255 }
256
257 fn run_log_source(&self, log_source: Arc<Box<dyn LogSource + Send + Sync>>) {
259 let sender = self.log_sender.clone();
260
261 std::thread::Builder::new()
262 .name(log_source.get_address())
263 .spawn(|| {
264 async_std::task::spawn(async move {
265 log_source.run(sender).await.unwrap();
266 });
267 })
268 .unwrap();
269 }
270}
271
272impl LogAnalyzer for LogService {
273 fn add_log(
274 &self,
275 source_type: usize,
276 source_address: &str,
277 format: Option<&String>,
278 ) -> Result<()> {
279 let log_store = self.log_store.clone();
280
281 let source_type = SourceType::try_from(source_type).unwrap();
282
283 let log_source = Arc::new(async_std::task::block_on(create_source(
284 source_type,
285 source_address.to_string(),
286 ))?);
287 log_store.add_log(source_address, log_source.clone(), format, true);
288 self.run_log_source(log_source);
289
290 Ok(())
291 }
292
293 fn add_format(&self, alias: &str, regex: &str) -> Result<()> {
294 let format = Format::new(alias, regex)?;
295
296 self.processing_store.add_format(format.alias, format.regex);
297 Ok(())
298 }
299
300 fn add_search(&self, regex: &str) {
301 let re = Regex::new(regex);
302 self.analysis_store.reset_search();
303
304 if re.is_ok() {
305 self.analysis_store.add_search_query(regex);
306
307 let analysis_store = self.analysis_store.clone();
308 let regex_str = regex.to_string();
309 let sender = self.event_channel.clone();
310
311 std::thread::Builder::new()
312 .name("Search".to_string())
313 .spawn(move || {
314 let log = analysis_store.fetch_log();
315
316 if !log.is_empty() {
317 sender.send(Event::Searching).unwrap_or_default();
318 scope(|scope| {
319 let num_cpus = num_cpus::get();
320 let chunk_size = log.len() / num_cpus;
321 let search_lines: Vec<LogLine> = log
322 .chunks(chunk_size.max(num_cpus))
323 .parallel_map_scoped(scope, move |chunk| {
324 let lines = chunk.to_owned();
325 let r = Regex::new(®ex_str).unwrap();
326 let mut v: Vec<LogLine> = Vec::with_capacity(lines.len());
327
328 for log_line in lines {
329 if apply_search(&r, &log_line) {
330 v.push(log_line);
331 };
332 }
333
334 v
335 })
336 .flatten()
337 .collect::<Vec<LogLine>>();
338 analysis_store.add_search_lines(&search_lines);
339 })
340 .unwrap();
341 sender.send(Event::SearchFinished).unwrap_or_default();
342 }
343 })
344 .unwrap();
345 }
346 }
347
348 fn add_filter(&self, filter: Filter) {
349 self.processing_store
350 .add_filter(filter.alias, filter.filter, filter.action, false);
351 }
352
353 fn get_log_lines(&self, from: usize, to: usize) -> Vec<LogLine> {
354 self.analysis_store.get_log_lines(from, to)
355 }
356
357 fn get_search_lines(&self, from: usize, to: usize) -> Vec<LogLineStyled> {
358 let search_lines_containing = self.analysis_store.get_search_lines(from, to);
359 let mut styled_search_lines = vec![];
360
361 if !search_lines_containing.is_empty() {
362 let query = Regex::new(&self.analysis_store.get_search_query().unwrap()).unwrap();
364 styled_search_lines = search_lines_containing
365 .into_iter()
366 .map(|l| format_search(&query, &l))
367 .collect();
368 }
369
370 styled_search_lines
371 }
372
373 fn get_log_lines_containing(
374 &self,
375 index: usize,
376 elements: usize,
377 ) -> (Vec<LogLine>, usize, usize) {
378 self.analysis_store
379 .get_log_lines_containing(index, elements)
380 }
381
382 fn get_search_lines_containing(
383 &self,
384 index: usize,
385 elements: usize,
386 ) -> (Vec<LogLineStyled>, usize, usize) {
387 let search_lines_containing = self
388 .analysis_store
389 .get_search_lines_containing(index, elements);
390
391 let mut styled_search_lines: (Vec<LogLineStyled>, usize, usize) =
392 (vec![], search_lines_containing.1, search_lines_containing.2);
393
394 if !search_lines_containing.0.is_empty() {
395 let query = Regex::new(&self.analysis_store.get_search_query().unwrap()).unwrap();
397 styled_search_lines.0 = search_lines_containing
398 .0
399 .into_iter()
400 .map(|l| format_search(&query, &l))
401 .collect();
402 }
403
404 styled_search_lines
405 }
406
407 fn get_logs(&self) -> Vec<(bool, String, Option<String>)> {
408 self.log_store.get_logs()
409 }
410
411 fn get_formats(&self) -> Vec<Format> {
412 self.processing_store.get_formats()
413 }
414
415 fn get_filters(&self) -> Vec<(bool, Filter)> {
416 self.processing_store.get_filters()
417 }
418
419 fn get_total_raw_lines(&self) -> usize {
420 self.log_store.get_total_lines()
421 }
422
423 fn get_total_filtered_lines(&self) -> usize {
424 self.analysis_store.get_total_filtered_lines()
425 }
426
427 fn get_total_searched_lines(&self) -> usize {
428 self.analysis_store.get_total_searched_lines()
429 }
430
431 fn toggle_source(&self, id: &str) {
432 if let Some((enabled, _log, _format)) = self
433 .log_store
434 .get_logs()
435 .into_iter()
436 .find(|(_, log_id, _)| log_id == id)
437 {
438 if let Some(source) = self.log_store.get_source(id) {
439 self.log_store.toggle_log(id);
440 if enabled {
442 source.stop();
443 } else {
444 self.run_log_source(source);
445 }
446 }
447 }
448 }
449
450 fn toggle_filter(&self, id: &str) {
451 self.processing_store.toggle_filter(id);
452
453 self.analysis_store.reset_log();
455 self.analysis_store.reset_search();
456
457 let mut receiver = self.event_channel.subscribe();
458
459 let enabled_logs: Vec<String> = self
460 .log_store
461 .get_logs()
462 .into_iter()
463 .filter(|(enabled, _, _)| *enabled)
464 .map(|(_, id, _)| id)
465 .collect();
466
467 let log_store = self.log_store.clone();
468 let sender = self.log_sender.clone();
469 let event_sender = self.event_channel.clone();
470
471 std::thread::Builder::new()
472 .name("Toggle filter".to_string())
473 .spawn(move || {
474 for log in enabled_logs {
475 let lines = log_store.extract_lines(&log);
476
477 if lines.is_empty() {
478 event_sender.send(Event::FilterFinished).unwrap();
479 continue;
480 }
481
482 event_sender.send(Event::Filtering).unwrap();
483 sender.send((log.clone(), lines.to_vec())).unwrap();
484
485 while !matches!(
486 async_std::task::block_on(receiver.recv()).unwrap_or(Event::Filtering),
487 Event::NewLines(_, last) if last == (lines.len() - 1)
488 ) {
489 std::thread::sleep(Duration::from_millis(100));
490 }
491 event_sender.send(Event::FilterFinished).unwrap();
492 }
493 })
494 .unwrap();
495 }
496
497 fn on_event(&self) -> broadcast::Receiver<Event> {
498 self.event_channel.subscribe()
499 }
500}