logged_stream/
stream.rs

1use crate::buffer_formatter::BufferFormatter;
2use crate::logger::Logger;
3use crate::record::Record;
4use crate::record::RecordKind;
5use crate::ChannelLogger;
6use crate::MemoryStorageLogger;
7use crate::RecordFilter;
8use std::collections;
9use std::fmt;
10use std::io;
11use std::pin::Pin;
12use std::sync::mpsc;
13use std::task::Context;
14use std::task::Poll;
15use tokio::io as tokio_io;
16
17/// This is a structure that can be used as a wrapper for underlying IO object which implements [`Read`]
18/// and [`Write`] traits or their asynchronous analogues from [`tokio`] library [`AsyncRead`] and
19/// [`AsyncWrite`] to enable logging of all read and write operations, errors and drop.
20///
21/// [`LoggedStream`] structure constructs from four parts:
22///
23/// -   Underlying IO object, which must implement [`Write`] and [`Read`] traits or their
24///     asynchronous analogues from [`tokio`] library: [`AsyncRead`] and [`AsyncWrite`].
25/// -   Buffer formatting part, which must implement [`BufferFormatter`] trait provided by this library.
26///     This part of [`LoggedStream`] is responsible for the form you will see the input and
27///     output bytes. Currently this library provides the following implementations of [`BufferFormatter`] trait:
28///     [`LowercaseHexadecimalFormatter`], [`UppercaseHexadecimalFormatter`], [`DecimalFormatter`],
29///     [`BinaryFormatter`] and [`OctalFormatter`]. Also [`BufferFormatter`] is public trait so you are
30///     free to construct your own implementation.
31/// -   Filtering part, which must implement [`RecordFilter`] trait provide by this library.
32///     This part of [`LoggedStream`] is responsible for log records filtering. Currently this library
33///     provides the following implementation of [`RecordFilter`] trait: [`DefaultFilter`] which accepts
34///     all log records and [`RecordKindFilter`] which accepts logs with kinds specified during construct.
35///     Also [`RecordFilter`] is public trait and you are free to construct your own implementation.
36/// -   Logging part, which must implement [`Logger`] trait provided by this library. This part
37///     of [`LoggedStream`] is responsible for further work with constructed, formatter and filtered
38///     log record. For example, it can be outputted to console, written to the file, written to database,
39///     written to the memory for further use or sended by the channel. Currently this library provides
40///     the following implementations of [`Logger`] trait: [`ConsoleLogger`], [`MemoryStorageLogger`],
41///     [`ChannelLogger`] and [`FileLogger`]. Also [`Logger`] is public trait and you are free to construct
42///     your own implementation.
43///
44/// [`Read`]: io::Read
45/// [`Write`]: io::Write
46/// [`AsyncRead`]: tokio::io::AsyncRead
47/// [`AsyncWrite`]: tokio::io::AsyncWrite
48/// [`LowercaseHexadecimalFormatter`]: crate::LowercaseHexadecimalFormatter
49/// [`UppercaseHexadecimalFormatter`]: crate::UppercaseHexadecimalFormatter
50/// [`DecimalFormatter`]: crate::DecimalFormatter
51/// [`BinaryFormatter`]: crate::BinaryFormatter
52/// [`OctalFormatter`]: crate::OctalFormatter
53/// [`DefaultFilter`]: crate::DefaultFilter
54/// [`RecordKindFilter`]: crate::RecordKindFilter
55/// [`ConsoleLogger`]: crate::ConsoleLogger
56/// [`FileLogger`]: crate::FileLogger
57pub struct LoggedStream<
58    S: 'static,
59    Formatter: 'static,
60    Filter: RecordFilter + 'static,
61    L: Logger + 'static,
62> {
63    inner_stream: S,
64    formatter: Formatter,
65    filter: Filter,
66    logger: L,
67}
68
69impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static, L: Logger + 'static>
70    LoggedStream<S, Formatter, Filter, L>
71{
72    /// Construct a new instance of [`LoggedStream`] using provided arguments.
73    pub fn new(stream: S, formatter: Formatter, filter: Filter, logger: L) -> Self {
74        Self {
75            inner_stream: stream,
76            formatter,
77            filter,
78            logger,
79        }
80    }
81}
82
83impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static>
84    LoggedStream<S, Formatter, Filter, MemoryStorageLogger>
85{
86    #[inline]
87    pub fn get_log_records(&self) -> collections::VecDeque<Record> {
88        self.logger.get_log_records()
89    }
90
91    #[inline]
92    pub fn clear_log_records(&mut self) {
93        self.logger.clear_log_records()
94    }
95}
96
97impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static>
98    LoggedStream<S, Formatter, Filter, ChannelLogger>
99{
100    #[inline]
101    pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<Record>> {
102        self.logger.take_receiver()
103    }
104
105    #[inline]
106    pub fn take_receiver_unchecked(&mut self) -> mpsc::Receiver<Record> {
107        self.logger.take_receiver_unchecked()
108    }
109}
110
111impl<
112        S: fmt::Debug + 'static,
113        Formatter: fmt::Debug + 'static,
114        Filter: RecordFilter + fmt::Debug + 'static,
115        L: Logger + fmt::Debug + 'static,
116    > fmt::Debug for LoggedStream<S, Formatter, Filter, L>
117{
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        f.debug_struct("LoggedStream")
120            .field("inner_stream", &self.inner_stream)
121            .field("formatter", &self.formatter)
122            .field("filter", &self.filter)
123            .field("logger", &self.logger)
124            .finish()
125    }
126}
127
128impl<
129        S: io::Read + 'static,
130        Formatter: BufferFormatter + 'static,
131        Filter: RecordFilter + 'static,
132        L: Logger + 'static,
133    > io::Read for LoggedStream<S, Formatter, Filter, L>
134{
135    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
136        let result = self.inner_stream.read(buf);
137
138        match &result {
139            Ok(length) => {
140                let record = Record::new(
141                    RecordKind::Read,
142                    self.formatter.format_buffer(&buf[0..*length]),
143                );
144                if self.filter.check(&record) {
145                    self.logger.log(record);
146                }
147            }
148            Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => {}
149            Err(e) => self.logger.log(Record::new(
150                RecordKind::Error,
151                format!("Error during read: {e}"),
152            )),
153        };
154
155        result
156    }
157}
158
159impl<
160        S: tokio_io::AsyncRead + Unpin + 'static,
161        Formatter: BufferFormatter + Unpin + 'static,
162        Filter: RecordFilter + Unpin + 'static,
163        L: Logger + Unpin + 'static,
164    > tokio_io::AsyncRead for LoggedStream<S, Formatter, Filter, L>
165{
166    fn poll_read(
167        self: Pin<&mut Self>,
168        cx: &mut Context<'_>,
169        buf: &mut tokio_io::ReadBuf<'_>,
170    ) -> Poll<io::Result<()>> {
171        let mut_self = self.get_mut();
172        let length_before_read = buf.filled().len();
173        let result = Pin::new(&mut mut_self.inner_stream).poll_read(cx, buf);
174        let length_after_read = buf.filled().len();
175        let diff = length_after_read - length_before_read;
176
177        match &result {
178            Poll::Ready(Ok(())) if diff == 0 => {}
179            Poll::Ready(Ok(())) => {
180                let record = Record::new(
181                    RecordKind::Read,
182                    mut_self
183                        .formatter
184                        .format_buffer(&(buf.filled())[length_before_read..length_after_read]),
185                );
186                if mut_self.filter.check(&record) {
187                    mut_self.logger.log(record);
188                }
189            }
190            Poll::Ready(Err(e)) => mut_self.logger.log(Record::new(
191                RecordKind::Error,
192                format!("Error during async read: {e}"),
193            )),
194            Poll::Pending => {}
195        }
196
197        result
198    }
199}
200
201impl<
202        S: io::Write + 'static,
203        Formatter: BufferFormatter + 'static,
204        Filter: RecordFilter + 'static,
205        L: Logger + 'static,
206    > io::Write for LoggedStream<S, Formatter, Filter, L>
207{
208    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
209        let result = self.inner_stream.write(buf);
210
211        match &result {
212            Ok(length) => {
213                let record = Record::new(
214                    RecordKind::Write,
215                    self.formatter.format_buffer(&buf[0..*length]),
216                );
217                if self.filter.check(&record) {
218                    self.logger.log(record);
219                }
220            }
221            Err(e)
222                if matches!(
223                    e.kind(),
224                    io::ErrorKind::WriteZero | io::ErrorKind::WouldBlock
225                ) => {}
226            Err(e) => self.logger.log(Record::new(
227                RecordKind::Error,
228                format!("Error during write: {e}"),
229            )),
230        };
231
232        result
233    }
234
235    fn flush(&mut self) -> io::Result<()> {
236        self.inner_stream.flush()
237    }
238}
239
240impl<
241        S: tokio_io::AsyncWrite + Unpin + 'static,
242        Formatter: BufferFormatter + Unpin + 'static,
243        Filter: RecordFilter + Unpin + 'static,
244        L: Logger + Unpin + 'static,
245    > tokio_io::AsyncWrite for LoggedStream<S, Formatter, Filter, L>
246{
247    fn poll_write(
248        self: Pin<&mut Self>,
249        cx: &mut Context<'_>,
250        buf: &[u8],
251    ) -> Poll<Result<usize, io::Error>> {
252        let mut_self = self.get_mut();
253        let result = Pin::new(&mut mut_self.inner_stream).poll_write(cx, buf);
254        match &result {
255            Poll::Ready(Ok(length)) => {
256                let record = Record::new(
257                    RecordKind::Write,
258                    mut_self.formatter.format_buffer(&buf[0..*length]),
259                );
260                if mut_self.filter.check(&record) {
261                    mut_self.logger.log(record);
262                }
263            }
264            Poll::Ready(Err(e)) => mut_self.logger.log(Record::new(
265                RecordKind::Error,
266                format!("Error during async write: {e}"),
267            )),
268            Poll::Pending => {}
269        }
270        result
271    }
272
273    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
274        Pin::new(&mut self.get_mut().inner_stream).poll_flush(cx)
275    }
276
277    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
278        let mut_self = self.get_mut();
279        let result = Pin::new(&mut mut_self.inner_stream).poll_shutdown(cx);
280        let record = Record::new(
281            RecordKind::Shutdown,
282            String::from("Writer shutdown request."),
283        );
284        if mut_self.filter.check(&record) {
285            mut_self.logger.log(record);
286        }
287        result
288    }
289}
290
291impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static, L: Logger + 'static> Drop
292    for LoggedStream<S, Formatter, Filter, L>
293{
294    fn drop(&mut self) {
295        let record = Record::new(RecordKind::Drop, String::from("Deallocated."));
296        if self.filter.check(&record) {
297            self.logger.log(record);
298        }
299    }
300}