Skip to main content

logged_stream/
stream.rs

1use crate::buffer_formatter::BufferFormatter;
2use crate::logger::Logger;
3use crate::record::Record;
4use crate::record::RecordKind;
5use crate::ChannelLogger;
6use crate::MemoryStorageLogger;
7use crate::RecordFilter;
8use std::collections;
9use std::fmt;
10use std::io;
11use std::pin::Pin;
12use std::sync::mpsc;
13use std::task::Context;
14use std::task::Poll;
15use tokio::io as tokio_io;
16
17/// Wrapper for IO objects to log all read and write operations, errors, and drop events.
18///
19/// This structure can be used as a wrapper for underlying IO objects that implement the [`Read`] and [`Write`] traits,
20/// or their asynchronous analogues from the [`tokio`] library, [`AsyncRead`] and [`AsyncWrite`]. It enables logging
21/// of all read and write operations, errors, and drop events.
22///
23/// [`LoggedStream`] structure constructs from four parts:
24///
25/// -   Underlying IO object, which must implement [`Write`] and [`Read`] traits or their
26///     asynchronous analogues from [`tokio`] library: [`AsyncRead`] and [`AsyncWrite`].
27/// -   Buffer formatting part, which must implement [`BufferFormatter`] trait provided by this library.
28///     This part of [`LoggedStream`] is responsible for the form you will see the input and
29///     output bytes. Currently this library provides the following implementations of [`BufferFormatter`] trait:
30///     [`LowercaseHexadecimalFormatter`], [`UppercaseHexadecimalFormatter`], [`DecimalFormatter`],
31///     [`BinaryFormatter`] and [`OctalFormatter`]. Also [`BufferFormatter`] is public trait so you are
32///     free to construct your own implementation.
33/// -   Filtering part, which must implement [`RecordFilter`] trait provided by this library.
34///     This part of [`LoggedStream`] is responsible for log records filtering. Currently this
35///     library provides the following implementations of [`RecordFilter`] trait: [`DefaultFilter`] which
36///     accepts all log records, [`RecordKindFilter`] which accepts logs with kinds specified during
37///     construct, [`AllFilter`] which combines multiple filters with AND logic (accepts record only if
38///     all underlying filters accept it), and [`AnyFilter`] which combines multiple filters with OR logic
39///     (accepts record if any underlying filter accepts it). Also [`RecordFilter`] is public trait and
40///     you are free to construct your own implementation.
41/// -   Logging part, which must implement [`Logger`] trait provided by this library. This part
42///     of [`LoggedStream`] is responsible for further work with constructed, formatter and filtered
43///     log record. For example, it can be outputted to console, written to the file, written to database,
44///     written to the memory for further use or sended by the channel. Currently this library provides
45///     the following implementations of [`Logger`] trait: [`ConsoleLogger`], [`MemoryStorageLogger`],
46///     [`ChannelLogger`] and [`FileLogger`]. Also [`Logger`] is public trait and you are free to construct
47///     your own implementation.
48///
49/// [`Read`]: io::Read
50/// [`Write`]: io::Write
51/// [`AsyncRead`]: tokio::io::AsyncRead
52/// [`AsyncWrite`]: tokio::io::AsyncWrite
53/// [`LowercaseHexadecimalFormatter`]: crate::LowercaseHexadecimalFormatter
54/// [`UppercaseHexadecimalFormatter`]: crate::UppercaseHexadecimalFormatter
55/// [`DecimalFormatter`]: crate::DecimalFormatter
56/// [`BinaryFormatter`]: crate::BinaryFormatter
57/// [`OctalFormatter`]: crate::OctalFormatter
58/// [`DefaultFilter`]: crate::DefaultFilter
59/// [`RecordKindFilter`]: crate::RecordKindFilter
60/// [`AllFilter`]: crate::AllFilter
61/// [`AnyFilter`]: crate::AnyFilter
62/// [`ConsoleLogger`]: crate::ConsoleLogger
63/// [`FileLogger`]: crate::FileLogger
64pub struct LoggedStream<
65    S: 'static,
66    Formatter: 'static,
67    Filter: RecordFilter + 'static,
68    L: Logger + 'static,
69> {
70    inner_stream: S,
71    formatter: Formatter,
72    filter: Filter,
73    logger: L,
74}
75
76impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static, L: Logger + 'static>
77    LoggedStream<S, Formatter, Filter, L>
78{
79    /// Construct a new instance of [`LoggedStream`] using provided arguments.
80    pub fn new(stream: S, formatter: Formatter, filter: Filter, logger: L) -> Self {
81        Self {
82            inner_stream: stream,
83            formatter,
84            filter,
85            logger,
86        }
87    }
88}
89
90impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static>
91    LoggedStream<S, Formatter, Filter, MemoryStorageLogger>
92{
93    #[inline]
94    pub fn get_log_records(&self) -> collections::VecDeque<Record> {
95        self.logger.get_log_records()
96    }
97
98    #[inline]
99    pub fn clear_log_records(&mut self) {
100        self.logger.clear_log_records()
101    }
102}
103
104impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static>
105    LoggedStream<S, Formatter, Filter, ChannelLogger>
106{
107    #[inline]
108    pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<Record>> {
109        self.logger.take_receiver()
110    }
111
112    #[inline]
113    pub fn take_receiver_unchecked(&mut self) -> mpsc::Receiver<Record> {
114        self.logger.take_receiver_unchecked()
115    }
116}
117
118impl<
119        S: fmt::Debug + 'static,
120        Formatter: fmt::Debug + 'static,
121        Filter: RecordFilter + fmt::Debug + 'static,
122        L: Logger + fmt::Debug + 'static,
123    > fmt::Debug for LoggedStream<S, Formatter, Filter, L>
124{
125    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126        f.debug_struct("LoggedStream")
127            .field("inner_stream", &self.inner_stream)
128            .field("formatter", &self.formatter)
129            .field("filter", &self.filter)
130            .field("logger", &self.logger)
131            .finish()
132    }
133}
134
135impl<
136        S: io::Read + 'static,
137        Formatter: BufferFormatter + 'static,
138        Filter: RecordFilter + 'static,
139        L: Logger + 'static,
140    > io::Read for LoggedStream<S, Formatter, Filter, L>
141{
142    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
143        let result = self.inner_stream.read(buf);
144
145        match &result {
146            Ok(length) => {
147                let record = Record::new(
148                    RecordKind::Read,
149                    self.formatter.format_buffer(&buf[0..*length]),
150                );
151                if self.filter.check(&record) {
152                    self.logger.log(record);
153                }
154            }
155            Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => {}
156            Err(e) => self.logger.log(Record::new(
157                RecordKind::Error,
158                format!("Error during read: {e}"),
159            )),
160        };
161
162        result
163    }
164}
165
166impl<
167        S: tokio_io::AsyncRead + Unpin + 'static,
168        Formatter: BufferFormatter + Unpin + 'static,
169        Filter: RecordFilter + Unpin + 'static,
170        L: Logger + Unpin + 'static,
171    > tokio_io::AsyncRead for LoggedStream<S, Formatter, Filter, L>
172{
173    fn poll_read(
174        self: Pin<&mut Self>,
175        cx: &mut Context<'_>,
176        buf: &mut tokio_io::ReadBuf<'_>,
177    ) -> Poll<io::Result<()>> {
178        let mut_self = self.get_mut();
179        let length_before_read = buf.filled().len();
180        let result = Pin::new(&mut mut_self.inner_stream).poll_read(cx, buf);
181        let length_after_read = buf.filled().len();
182        let diff = length_after_read - length_before_read;
183
184        match &result {
185            Poll::Ready(Ok(())) if diff == 0 => {}
186            Poll::Ready(Ok(())) => {
187                let record = Record::new(
188                    RecordKind::Read,
189                    mut_self
190                        .formatter
191                        .format_buffer(&(buf.filled())[length_before_read..length_after_read]),
192                );
193                if mut_self.filter.check(&record) {
194                    mut_self.logger.log(record);
195                }
196            }
197            Poll::Ready(Err(e)) => mut_self.logger.log(Record::new(
198                RecordKind::Error,
199                format!("Error during async read: {e}"),
200            )),
201            Poll::Pending => {}
202        }
203
204        result
205    }
206}
207
208impl<
209        S: io::Write + 'static,
210        Formatter: BufferFormatter + 'static,
211        Filter: RecordFilter + 'static,
212        L: Logger + 'static,
213    > io::Write for LoggedStream<S, Formatter, Filter, L>
214{
215    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
216        let result = self.inner_stream.write(buf);
217
218        match &result {
219            Ok(length) => {
220                let record = Record::new(
221                    RecordKind::Write,
222                    self.formatter.format_buffer(&buf[0..*length]),
223                );
224                if self.filter.check(&record) {
225                    self.logger.log(record);
226                }
227            }
228            Err(e)
229                if matches!(
230                    e.kind(),
231                    io::ErrorKind::WriteZero | io::ErrorKind::WouldBlock
232                ) => {}
233            Err(e) => self.logger.log(Record::new(
234                RecordKind::Error,
235                format!("Error during write: {e}"),
236            )),
237        };
238
239        result
240    }
241
242    fn flush(&mut self) -> io::Result<()> {
243        self.inner_stream.flush()
244    }
245}
246
247impl<
248        S: tokio_io::AsyncWrite + Unpin + 'static,
249        Formatter: BufferFormatter + Unpin + 'static,
250        Filter: RecordFilter + Unpin + 'static,
251        L: Logger + Unpin + 'static,
252    > tokio_io::AsyncWrite for LoggedStream<S, Formatter, Filter, L>
253{
254    fn poll_write(
255        self: Pin<&mut Self>,
256        cx: &mut Context<'_>,
257        buf: &[u8],
258    ) -> Poll<Result<usize, io::Error>> {
259        let mut_self = self.get_mut();
260        let result = Pin::new(&mut mut_self.inner_stream).poll_write(cx, buf);
261        match &result {
262            Poll::Ready(Ok(length)) => {
263                let record = Record::new(
264                    RecordKind::Write,
265                    mut_self.formatter.format_buffer(&buf[0..*length]),
266                );
267                if mut_self.filter.check(&record) {
268                    mut_self.logger.log(record);
269                }
270            }
271            Poll::Ready(Err(e)) => mut_self.logger.log(Record::new(
272                RecordKind::Error,
273                format!("Error during async write: {e}"),
274            )),
275            Poll::Pending => {}
276        }
277        result
278    }
279
280    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
281        Pin::new(&mut self.get_mut().inner_stream).poll_flush(cx)
282    }
283
284    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
285        let mut_self = self.get_mut();
286        let result = Pin::new(&mut mut_self.inner_stream).poll_shutdown(cx);
287        let record = Record::new(
288            RecordKind::Shutdown,
289            String::from("Writer shutdown request."),
290        );
291        if mut_self.filter.check(&record) {
292            mut_self.logger.log(record);
293        }
294        result
295    }
296}
297
298impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static, L: Logger + 'static> Drop
299    for LoggedStream<S, Formatter, Filter, L>
300{
301    fn drop(&mut self) {
302        let record = Record::new(RecordKind::Drop, String::from("Deallocated."));
303        if self.filter.check(&record) {
304            self.logger.log(record);
305        }
306    }
307}