logged_stream/
stream.rs

1use crate::buffer_formatter::BufferFormatter;
2use crate::logger::Logger;
3use crate::record::Record;
4use crate::record::RecordKind;
5use crate::ChannelLogger;
6use crate::MemoryStorageLogger;
7use crate::RecordFilter;
8use std::collections;
9use std::fmt;
10use std::io;
11use std::pin::Pin;
12use std::sync::mpsc;
13use std::task::Context;
14use std::task::Poll;
15use tokio::io as tokio_io;
16
17/// Wrapper for IO objects to log all read and write operations, errors, and drop events.
18///
19/// This structure can be used as a wrapper for underlying IO objects that implement the [`Read`] and [`Write`] traits,
20/// or their asynchronous analogues from the [`tokio`] library, [`AsyncRead`] and [`AsyncWrite`]. It enables logging
21/// of all read and write operations, errors, and drop events.
22///
23/// [`LoggedStream`] structure constructs from four parts:
24///
25/// -   Underlying IO object, which must implement [`Write`] and [`Read`] traits or their
26///     asynchronous analogues from [`tokio`] library: [`AsyncRead`] and [`AsyncWrite`].
27/// -   Buffer formatting part, which must implement [`BufferFormatter`] trait provided by this library.
28///     This part of [`LoggedStream`] is responsible for the form you will see the input and
29///     output bytes. Currently this library provides the following implementations of [`BufferFormatter`] trait:
30///     [`LowercaseHexadecimalFormatter`], [`UppercaseHexadecimalFormatter`], [`DecimalFormatter`],
31///     [`BinaryFormatter`] and [`OctalFormatter`]. Also [`BufferFormatter`] is public trait so you are
32///     free to construct your own implementation.
33/// -   Filtering part, which must implement [`RecordFilter`] trait provide by this library.
34///     This part of [`LoggedStream`] is responsible for log records filtering. Currently this library
35///     provides the following implementation of [`RecordFilter`] trait: [`DefaultFilter`] which accepts
36///     all log records and [`RecordKindFilter`] which accepts logs with kinds specified during construct.
37///     Also [`RecordFilter`] is public trait and you are free to construct your own implementation.
38/// -   Logging part, which must implement [`Logger`] trait provided by this library. This part
39///     of [`LoggedStream`] is responsible for further work with constructed, formatter and filtered
40///     log record. For example, it can be outputted to console, written to the file, written to database,
41///     written to the memory for further use or sended by the channel. Currently this library provides
42///     the following implementations of [`Logger`] trait: [`ConsoleLogger`], [`MemoryStorageLogger`],
43///     [`ChannelLogger`] and [`FileLogger`]. Also [`Logger`] is public trait and you are free to construct
44///     your own implementation.
45///
46/// [`Read`]: io::Read
47/// [`Write`]: io::Write
48/// [`AsyncRead`]: tokio::io::AsyncRead
49/// [`AsyncWrite`]: tokio::io::AsyncWrite
50/// [`LowercaseHexadecimalFormatter`]: crate::LowercaseHexadecimalFormatter
51/// [`UppercaseHexadecimalFormatter`]: crate::UppercaseHexadecimalFormatter
52/// [`DecimalFormatter`]: crate::DecimalFormatter
53/// [`BinaryFormatter`]: crate::BinaryFormatter
54/// [`OctalFormatter`]: crate::OctalFormatter
55/// [`DefaultFilter`]: crate::DefaultFilter
56/// [`RecordKindFilter`]: crate::RecordKindFilter
57/// [`ConsoleLogger`]: crate::ConsoleLogger
58/// [`FileLogger`]: crate::FileLogger
59pub struct LoggedStream<
60    S: 'static,
61    Formatter: 'static,
62    Filter: RecordFilter + 'static,
63    L: Logger + 'static,
64> {
65    inner_stream: S,
66    formatter: Formatter,
67    filter: Filter,
68    logger: L,
69}
70
71impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static, L: Logger + 'static>
72    LoggedStream<S, Formatter, Filter, L>
73{
74    /// Construct a new instance of [`LoggedStream`] using provided arguments.
75    pub fn new(stream: S, formatter: Formatter, filter: Filter, logger: L) -> Self {
76        Self {
77            inner_stream: stream,
78            formatter,
79            filter,
80            logger,
81        }
82    }
83}
84
85impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static>
86    LoggedStream<S, Formatter, Filter, MemoryStorageLogger>
87{
88    #[inline]
89    pub fn get_log_records(&self) -> collections::VecDeque<Record> {
90        self.logger.get_log_records()
91    }
92
93    #[inline]
94    pub fn clear_log_records(&mut self) {
95        self.logger.clear_log_records()
96    }
97}
98
99impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static>
100    LoggedStream<S, Formatter, Filter, ChannelLogger>
101{
102    #[inline]
103    pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<Record>> {
104        self.logger.take_receiver()
105    }
106
107    #[inline]
108    pub fn take_receiver_unchecked(&mut self) -> mpsc::Receiver<Record> {
109        self.logger.take_receiver_unchecked()
110    }
111}
112
113impl<
114        S: fmt::Debug + 'static,
115        Formatter: fmt::Debug + 'static,
116        Filter: RecordFilter + fmt::Debug + 'static,
117        L: Logger + fmt::Debug + 'static,
118    > fmt::Debug for LoggedStream<S, Formatter, Filter, L>
119{
120    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121        f.debug_struct("LoggedStream")
122            .field("inner_stream", &self.inner_stream)
123            .field("formatter", &self.formatter)
124            .field("filter", &self.filter)
125            .field("logger", &self.logger)
126            .finish()
127    }
128}
129
130impl<
131        S: io::Read + 'static,
132        Formatter: BufferFormatter + 'static,
133        Filter: RecordFilter + 'static,
134        L: Logger + 'static,
135    > io::Read for LoggedStream<S, Formatter, Filter, L>
136{
137    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
138        let result = self.inner_stream.read(buf);
139
140        match &result {
141            Ok(length) => {
142                let record = Record::new(
143                    RecordKind::Read,
144                    self.formatter.format_buffer(&buf[0..*length]),
145                );
146                if self.filter.check(&record) {
147                    self.logger.log(record);
148                }
149            }
150            Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => {}
151            Err(e) => self.logger.log(Record::new(
152                RecordKind::Error,
153                format!("Error during read: {e}"),
154            )),
155        };
156
157        result
158    }
159}
160
161impl<
162        S: tokio_io::AsyncRead + Unpin + 'static,
163        Formatter: BufferFormatter + Unpin + 'static,
164        Filter: RecordFilter + Unpin + 'static,
165        L: Logger + Unpin + 'static,
166    > tokio_io::AsyncRead for LoggedStream<S, Formatter, Filter, L>
167{
168    fn poll_read(
169        self: Pin<&mut Self>,
170        cx: &mut Context<'_>,
171        buf: &mut tokio_io::ReadBuf<'_>,
172    ) -> Poll<io::Result<()>> {
173        let mut_self = self.get_mut();
174        let length_before_read = buf.filled().len();
175        let result = Pin::new(&mut mut_self.inner_stream).poll_read(cx, buf);
176        let length_after_read = buf.filled().len();
177        let diff = length_after_read - length_before_read;
178
179        match &result {
180            Poll::Ready(Ok(())) if diff == 0 => {}
181            Poll::Ready(Ok(())) => {
182                let record = Record::new(
183                    RecordKind::Read,
184                    mut_self
185                        .formatter
186                        .format_buffer(&(buf.filled())[length_before_read..length_after_read]),
187                );
188                if mut_self.filter.check(&record) {
189                    mut_self.logger.log(record);
190                }
191            }
192            Poll::Ready(Err(e)) => mut_self.logger.log(Record::new(
193                RecordKind::Error,
194                format!("Error during async read: {e}"),
195            )),
196            Poll::Pending => {}
197        }
198
199        result
200    }
201}
202
203impl<
204        S: io::Write + 'static,
205        Formatter: BufferFormatter + 'static,
206        Filter: RecordFilter + 'static,
207        L: Logger + 'static,
208    > io::Write for LoggedStream<S, Formatter, Filter, L>
209{
210    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
211        let result = self.inner_stream.write(buf);
212
213        match &result {
214            Ok(length) => {
215                let record = Record::new(
216                    RecordKind::Write,
217                    self.formatter.format_buffer(&buf[0..*length]),
218                );
219                if self.filter.check(&record) {
220                    self.logger.log(record);
221                }
222            }
223            Err(e)
224                if matches!(
225                    e.kind(),
226                    io::ErrorKind::WriteZero | io::ErrorKind::WouldBlock
227                ) => {}
228            Err(e) => self.logger.log(Record::new(
229                RecordKind::Error,
230                format!("Error during write: {e}"),
231            )),
232        };
233
234        result
235    }
236
237    fn flush(&mut self) -> io::Result<()> {
238        self.inner_stream.flush()
239    }
240}
241
242impl<
243        S: tokio_io::AsyncWrite + Unpin + 'static,
244        Formatter: BufferFormatter + Unpin + 'static,
245        Filter: RecordFilter + Unpin + 'static,
246        L: Logger + Unpin + 'static,
247    > tokio_io::AsyncWrite for LoggedStream<S, Formatter, Filter, L>
248{
249    fn poll_write(
250        self: Pin<&mut Self>,
251        cx: &mut Context<'_>,
252        buf: &[u8],
253    ) -> Poll<Result<usize, io::Error>> {
254        let mut_self = self.get_mut();
255        let result = Pin::new(&mut mut_self.inner_stream).poll_write(cx, buf);
256        match &result {
257            Poll::Ready(Ok(length)) => {
258                let record = Record::new(
259                    RecordKind::Write,
260                    mut_self.formatter.format_buffer(&buf[0..*length]),
261                );
262                if mut_self.filter.check(&record) {
263                    mut_self.logger.log(record);
264                }
265            }
266            Poll::Ready(Err(e)) => mut_self.logger.log(Record::new(
267                RecordKind::Error,
268                format!("Error during async write: {e}"),
269            )),
270            Poll::Pending => {}
271        }
272        result
273    }
274
275    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
276        Pin::new(&mut self.get_mut().inner_stream).poll_flush(cx)
277    }
278
279    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
280        let mut_self = self.get_mut();
281        let result = Pin::new(&mut mut_self.inner_stream).poll_shutdown(cx);
282        let record = Record::new(
283            RecordKind::Shutdown,
284            String::from("Writer shutdown request."),
285        );
286        if mut_self.filter.check(&record) {
287            mut_self.logger.log(record);
288        }
289        result
290    }
291}
292
293impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static, L: Logger + 'static> Drop
294    for LoggedStream<S, Formatter, Filter, L>
295{
296    fn drop(&mut self) {
297        let record = Record::new(RecordKind::Drop, String::from("Deallocated."));
298        if self.filter.check(&record) {
299            self.logger.log(record);
300        }
301    }
302}