log_source/source/
log_source.rs

1
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use anyhow::{anyhow, Result};
6
7use async_std::net::TcpStream;
8use async_std::{
9    fs::File,
10    io::{prelude::BufReadExt, BufReader},
11    prelude::StreamExt,
12};
13use async_trait::async_trait;
14use flume::Sender;
15use parking_lot::RwLock;
16
17
18#[derive(Eq, PartialEq)]
19pub enum SourceType {
20    FILE,
21    WS,
22}
23
24impl TryFrom<usize> for SourceType {
25    type Error = ();
26
27    fn try_from(value: usize) -> Result<Self, Self::Error> {
28        match value {
29            0 => Ok(SourceType::FILE),
30            1 => Ok(SourceType::WS),
31            _ => Err(()),
32        }
33    }
34}
35
36impl From<SourceType> for usize {
37    fn from(val: SourceType) -> Self {
38        match val {
39            SourceType::FILE => 0,
40            SourceType::WS => 1,
41        }
42    }
43}
44
45async fn is_file_path_valid(path: &String) -> bool {
46    File::open(&path).await.is_ok()
47}
48
49pub async fn create_source(
50    source: SourceType,
51    source_address: String,
52) -> Result<Box<dyn LogSource + Send + Sync>> {
53    match source {
54        SourceType::FILE => match is_file_path_valid(&source_address).await {
55            true => Ok(Box::new(FileSource {
56                path: source_address,
57                read_lines: RwLock::new(0),
58                enabled: AtomicBool::new(true)
59            })),
60            false => Err(anyhow!(
61                "Could not open file.\nPlease ensure that path is correct"
62            )),
63        },
64        SourceType::WS => Ok(Box::new(WsSource {
65            address: source_address,
66            enabled: AtomicBool::new(true)
67        })),
68    }
69}
70
71#[async_trait]
72pub trait LogSource {
73    async fn run(&self, sender: Sender<(String, Vec<String>)>) -> Result<()>;
74    fn stop(&self);
75    fn get_address(&self) -> String;
76}
77
78pub struct FileSource {
79    path: String,
80    read_lines: RwLock<usize>,
81    enabled: AtomicBool
82}
83
84#[async_trait]
85impl LogSource for FileSource {
86    async fn run(&self, sender: Sender<(String, Vec<String>)>) -> Result<()> {
87        let capacity = 1_000_000_usize;
88        while self.enabled.load(Ordering::Relaxed) {
89            let file = File::open(&self.path).await;
90            match file {
91                Ok(f) => {
92                    let reader = BufReader::with_capacity(2_usize.pow(26), f);
93                    let mut v = Vec::with_capacity(capacity);
94                    let mut lines = reader.lines().skip(*self.read_lines.read());
95                    while let Some(line) = lines.next().await {
96                        v.push(line?);
97                        if v.len() >= capacity - 1 {
98                            sender.send_async((self.path.clone(), v)).await?;
99                            v = Vec::with_capacity(capacity);
100                        }
101                        *self.read_lines.write() += 1;
102                    }
103                    sender.send((self.path.clone(), v))?;
104                }
105                Err(_) => break,
106            }
107
108            async_std::task::sleep(Duration::from_millis(300)).await;
109        }
110        // restore after quitting
111        self.enabled.store(true, Ordering::Relaxed);
112        Ok(())
113    }
114
115    fn stop(&self) {
116        self.enabled.store(false, Ordering::Relaxed);
117    }
118
119    fn get_address(&self) -> String {
120        self.path.clone()
121    }
122
123}
124
125pub struct WsSource {
126    address: String,
127    enabled: AtomicBool
128}
129
130#[async_trait]
131impl LogSource for WsSource {
132    async fn run(&self, sender: Sender<(String, Vec<String>)>) -> Result<()> {
133        while self.enabled.load(Ordering::Relaxed) {
134            let stream = match TcpStream::connect(&self.address).await {
135                Ok(stream) => Some(stream),
136                Err(_) => None,
137            };
138            if let Some(stream) = stream {
139                while self.enabled.load(Ordering::Relaxed) {
140                    let mut lines_from_server = BufReader::new(&stream).lines().fuse();
141                    match lines_from_server.next().await {
142                        Some(line) => {
143                            let line = line?;
144                            sender.send((self.address.clone(), vec![line]))?;
145                        }
146                        None => break,
147                    }
148                }
149            }
150            async_std::task::sleep(Duration::from_secs(3)).await;
151        }
152        // restore after quitting
153        self.enabled.store(true, Ordering::Relaxed);
154        Ok(())
155    }
156
157    fn stop(&self) {
158        self.enabled.store(false, Ordering::Relaxed);
159    }
160
161    fn get_address(&self) -> String {
162        self.address.clone()
163    }
164}