arcs_logging_rs/
webwriter.rs

1use std::{sync::{RwLock, atomic::AtomicUsize}, thread};
2
3use reqwest::blocking::Client;
4use url::Url;
5
6use std::io::{ Result as IoResult, Error as IoError };
7
8const BUFFER_LINES: usize = 1;
9
10
11
12
13#[derive(Debug)]
14pub struct WebWriter {
15    url: Url,
16    request_client: Client,
17
18    line_buffer: RwLock<[Vec<u8>; BUFFER_LINES]>,
19    lines_buffered: AtomicUsize,
20}
21
22impl WebWriter {
23    pub fn new(url: Url) -> Self {
24        Self {
25            url,
26            request_client: Client::new(),
27
28            line_buffer: RwLock::new(std::array::from_fn(|_| Vec::new())),
29            lines_buffered: AtomicUsize::new(0),
30        }
31    }
32
33    pub fn add_line(&self, line: &[u8]) -> IoResult<usize> {
34        let mut buffer = self.line_buffer
35            .write()
36            .map_err(|e| {
37                IoError::new(
38                    std::io::ErrorKind::Other,
39                    format!("Failed to get access to buffer: {}", e),
40                )
41            })?;
42        
43        let Ok(idx) = self.lines_buffered.fetch_update(
44            std::sync::atomic::Ordering::SeqCst,
45            std::sync::atomic::Ordering::SeqCst,
46            |x| {
47                if x + 1 == BUFFER_LINES {
48                    Some(0)
49                } else {
50                    Some(x + 1)
51                }
52            }
53        ) else {
54            return Err(IoError::new(
55                std::io::ErrorKind::Other,
56                "Failed to update line count",
57            ));
58        };
59        buffer[idx] = line.strip_suffix(b"\n").unwrap_or(line).to_vec();
60
61        if idx + 1 == BUFFER_LINES {
62            let body: Vec<u8> = buffer.iter().flat_map(|line| [
63                line.as_slice(),
64                b"\n",
65            ]).flatten().copied().collect();
66
67            let Ok(request) = self.request_client.post(self.url.clone()).body(body).build() else {
68                return Err(IoError::new(
69                    std::io::ErrorKind::Other,
70                    "Failed to build request",
71                ));
72            };
73            let client = self.request_client.clone();
74
75            let Ok(_) = thread::Builder::new().spawn(move || client.execute(request)) else {
76                return Err(IoError::new(
77                    std::io::ErrorKind::Other,
78                    "Failed to spawn thread",
79                ));
80            };
81        }
82        drop(buffer);
83
84        Ok(line.len())
85        
86    }
87
88    pub fn flush(&self) -> IoResult<()> {
89        let buffer = self.line_buffer
90            .read()
91            .map_err(|e| {
92                IoError::new(
93                    std::io::ErrorKind::Other,
94                    format!("Failed to get access to buffer: {}", e),
95                )
96            })?;
97        
98        let count = self.lines_buffered.swap(0, std::sync::atomic::Ordering::SeqCst);
99
100        let body: Vec<u8> = buffer.iter().take(count).flat_map(|line| [
101            line.as_slice(),
102            b"\n",
103        ]).flatten().copied().collect();
104
105        let Ok(request) = self.request_client.post(self.url.clone()).body(body).build() else {
106            return Err(IoError::new(
107                std::io::ErrorKind::Other,
108                "Failed to build request",
109            ));
110        };
111        let client = self.request_client.clone();
112
113        let Ok(_) = thread::Builder::new().spawn(move || client.execute(request)) else {
114            return Err(IoError::new(
115                std::io::ErrorKind::Other,
116                "Failed to spawn thread",
117            ));
118        };
119
120        Ok(())
121    }
122}
123
124impl Clone for WebWriter {
125    fn clone(&self) -> Self {
126        Self {
127            url: self.url.clone(),
128            request_client: self.request_client.clone(),
129
130            line_buffer: RwLock::new(
131                self.line_buffer
132                    .read()
133                    .map(|arr| (*arr).clone())
134                    .unwrap_or(std::array::from_fn(|_| Vec::new()))
135            ),
136            lines_buffered: AtomicUsize::new(self.lines_buffered.load(std::sync::atomic::Ordering::Relaxed)),
137        }
138    }
139}
140