log_source/source/
log_source.rs1
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use anyhow::{anyhow, Result};
6
7use async_std::net::TcpStream;
8use async_std::{
9 fs::File,
10 io::{prelude::BufReadExt, BufReader},
11 prelude::StreamExt,
12};
13use async_trait::async_trait;
14use flume::Sender;
15use parking_lot::RwLock;
16
17
18#[derive(Eq, PartialEq)]
19pub enum SourceType {
20 FILE,
21 WS,
22}
23
24impl TryFrom<usize> for SourceType {
25 type Error = ();
26
27 fn try_from(value: usize) -> Result<Self, Self::Error> {
28 match value {
29 0 => Ok(SourceType::FILE),
30 1 => Ok(SourceType::WS),
31 _ => Err(()),
32 }
33 }
34}
35
36impl From<SourceType> for usize {
37 fn from(val: SourceType) -> Self {
38 match val {
39 SourceType::FILE => 0,
40 SourceType::WS => 1,
41 }
42 }
43}
44
45async fn is_file_path_valid(path: &String) -> bool {
46 File::open(&path).await.is_ok()
47}
48
49pub async fn create_source(
50 source: SourceType,
51 source_address: String,
52) -> Result<Box<dyn LogSource + Send + Sync>> {
53 match source {
54 SourceType::FILE => match is_file_path_valid(&source_address).await {
55 true => Ok(Box::new(FileSource {
56 path: source_address,
57 read_lines: RwLock::new(0),
58 enabled: AtomicBool::new(true)
59 })),
60 false => Err(anyhow!(
61 "Could not open file.\nPlease ensure that path is correct"
62 )),
63 },
64 SourceType::WS => Ok(Box::new(WsSource {
65 address: source_address,
66 enabled: AtomicBool::new(true)
67 })),
68 }
69}
70
71#[async_trait]
72pub trait LogSource {
73 async fn run(&self, sender: Sender<(String, Vec<String>)>) -> Result<()>;
74 fn stop(&self);
75 fn get_address(&self) -> String;
76}
77
78pub struct FileSource {
79 path: String,
80 read_lines: RwLock<usize>,
81 enabled: AtomicBool
82}
83
84#[async_trait]
85impl LogSource for FileSource {
86 async fn run(&self, sender: Sender<(String, Vec<String>)>) -> Result<()> {
87 let capacity = 1_000_000_usize;
88 while self.enabled.load(Ordering::Relaxed) {
89 let file = File::open(&self.path).await;
90 match file {
91 Ok(f) => {
92 let reader = BufReader::with_capacity(2_usize.pow(26), f);
93 let mut v = Vec::with_capacity(capacity);
94 let mut lines = reader.lines().skip(*self.read_lines.read());
95 while let Some(line) = lines.next().await {
96 v.push(line?);
97 if v.len() >= capacity - 1 {
98 sender.send_async((self.path.clone(), v)).await?;
99 v = Vec::with_capacity(capacity);
100 }
101 *self.read_lines.write() += 1;
102 }
103 sender.send((self.path.clone(), v))?;
104 }
105 Err(_) => break,
106 }
107
108 async_std::task::sleep(Duration::from_millis(300)).await;
109 }
110 self.enabled.store(true, Ordering::Relaxed);
112 Ok(())
113 }
114
115 fn stop(&self) {
116 self.enabled.store(false, Ordering::Relaxed);
117 }
118
119 fn get_address(&self) -> String {
120 self.path.clone()
121 }
122
123}
124
125pub struct WsSource {
126 address: String,
127 enabled: AtomicBool
128}
129
130#[async_trait]
131impl LogSource for WsSource {
132 async fn run(&self, sender: Sender<(String, Vec<String>)>) -> Result<()> {
133 while self.enabled.load(Ordering::Relaxed) {
134 let stream = match TcpStream::connect(&self.address).await {
135 Ok(stream) => Some(stream),
136 Err(_) => None,
137 };
138 if let Some(stream) = stream {
139 while self.enabled.load(Ordering::Relaxed) {
140 let mut lines_from_server = BufReader::new(&stream).lines().fuse();
141 match lines_from_server.next().await {
142 Some(line) => {
143 let line = line?;
144 sender.send((self.address.clone(), vec![line]))?;
145 }
146 None => break,
147 }
148 }
149 }
150 async_std::task::sleep(Duration::from_secs(3)).await;
151 }
152 self.enabled.store(true, Ordering::Relaxed);
154 Ok(())
155 }
156
157 fn stop(&self) {
158 self.enabled.store(false, Ordering::Relaxed);
159 }
160
161 fn get_address(&self) -> String {
162 self.address.clone()
163 }
164}