1use crate::buffer_formatter::BufferFormatter;
2use crate::logger::Logger;
3use crate::record::Record;
4use crate::record::RecordKind;
5use crate::ChannelLogger;
6use crate::MemoryStorageLogger;
7use crate::RecordFilter;
8use std::collections;
9use std::fmt;
10use std::io;
11use std::pin::Pin;
12use std::sync::mpsc;
13use std::task::Context;
14use std::task::Poll;
15use tokio::io as tokio_io;
16
17pub struct LoggedStream<
60 S: 'static,
61 Formatter: 'static,
62 Filter: RecordFilter + 'static,
63 L: Logger + 'static,
64> {
65 inner_stream: S,
66 formatter: Formatter,
67 filter: Filter,
68 logger: L,
69}
70
71impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static, L: Logger + 'static>
72 LoggedStream<S, Formatter, Filter, L>
73{
74 pub fn new(stream: S, formatter: Formatter, filter: Filter, logger: L) -> Self {
76 Self {
77 inner_stream: stream,
78 formatter,
79 filter,
80 logger,
81 }
82 }
83}
84
85impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static>
86 LoggedStream<S, Formatter, Filter, MemoryStorageLogger>
87{
88 #[inline]
89 pub fn get_log_records(&self) -> collections::VecDeque<Record> {
90 self.logger.get_log_records()
91 }
92
93 #[inline]
94 pub fn clear_log_records(&mut self) {
95 self.logger.clear_log_records()
96 }
97}
98
99impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static>
100 LoggedStream<S, Formatter, Filter, ChannelLogger>
101{
102 #[inline]
103 pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<Record>> {
104 self.logger.take_receiver()
105 }
106
107 #[inline]
108 pub fn take_receiver_unchecked(&mut self) -> mpsc::Receiver<Record> {
109 self.logger.take_receiver_unchecked()
110 }
111}
112
113impl<
114 S: fmt::Debug + 'static,
115 Formatter: fmt::Debug + 'static,
116 Filter: RecordFilter + fmt::Debug + 'static,
117 L: Logger + fmt::Debug + 'static,
118 > fmt::Debug for LoggedStream<S, Formatter, Filter, L>
119{
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 f.debug_struct("LoggedStream")
122 .field("inner_stream", &self.inner_stream)
123 .field("formatter", &self.formatter)
124 .field("filter", &self.filter)
125 .field("logger", &self.logger)
126 .finish()
127 }
128}
129
130impl<
131 S: io::Read + 'static,
132 Formatter: BufferFormatter + 'static,
133 Filter: RecordFilter + 'static,
134 L: Logger + 'static,
135 > io::Read for LoggedStream<S, Formatter, Filter, L>
136{
137 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
138 let result = self.inner_stream.read(buf);
139
140 match &result {
141 Ok(length) => {
142 let record = Record::new(
143 RecordKind::Read,
144 self.formatter.format_buffer(&buf[0..*length]),
145 );
146 if self.filter.check(&record) {
147 self.logger.log(record);
148 }
149 }
150 Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => {}
151 Err(e) => self.logger.log(Record::new(
152 RecordKind::Error,
153 format!("Error during read: {e}"),
154 )),
155 };
156
157 result
158 }
159}
160
161impl<
162 S: tokio_io::AsyncRead + Unpin + 'static,
163 Formatter: BufferFormatter + Unpin + 'static,
164 Filter: RecordFilter + Unpin + 'static,
165 L: Logger + Unpin + 'static,
166 > tokio_io::AsyncRead for LoggedStream<S, Formatter, Filter, L>
167{
168 fn poll_read(
169 self: Pin<&mut Self>,
170 cx: &mut Context<'_>,
171 buf: &mut tokio_io::ReadBuf<'_>,
172 ) -> Poll<io::Result<()>> {
173 let mut_self = self.get_mut();
174 let length_before_read = buf.filled().len();
175 let result = Pin::new(&mut mut_self.inner_stream).poll_read(cx, buf);
176 let length_after_read = buf.filled().len();
177 let diff = length_after_read - length_before_read;
178
179 match &result {
180 Poll::Ready(Ok(())) if diff == 0 => {}
181 Poll::Ready(Ok(())) => {
182 let record = Record::new(
183 RecordKind::Read,
184 mut_self
185 .formatter
186 .format_buffer(&(buf.filled())[length_before_read..length_after_read]),
187 );
188 if mut_self.filter.check(&record) {
189 mut_self.logger.log(record);
190 }
191 }
192 Poll::Ready(Err(e)) => mut_self.logger.log(Record::new(
193 RecordKind::Error,
194 format!("Error during async read: {e}"),
195 )),
196 Poll::Pending => {}
197 }
198
199 result
200 }
201}
202
203impl<
204 S: io::Write + 'static,
205 Formatter: BufferFormatter + 'static,
206 Filter: RecordFilter + 'static,
207 L: Logger + 'static,
208 > io::Write for LoggedStream<S, Formatter, Filter, L>
209{
210 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
211 let result = self.inner_stream.write(buf);
212
213 match &result {
214 Ok(length) => {
215 let record = Record::new(
216 RecordKind::Write,
217 self.formatter.format_buffer(&buf[0..*length]),
218 );
219 if self.filter.check(&record) {
220 self.logger.log(record);
221 }
222 }
223 Err(e)
224 if matches!(
225 e.kind(),
226 io::ErrorKind::WriteZero | io::ErrorKind::WouldBlock
227 ) => {}
228 Err(e) => self.logger.log(Record::new(
229 RecordKind::Error,
230 format!("Error during write: {e}"),
231 )),
232 };
233
234 result
235 }
236
237 fn flush(&mut self) -> io::Result<()> {
238 self.inner_stream.flush()
239 }
240}
241
242impl<
243 S: tokio_io::AsyncWrite + Unpin + 'static,
244 Formatter: BufferFormatter + Unpin + 'static,
245 Filter: RecordFilter + Unpin + 'static,
246 L: Logger + Unpin + 'static,
247 > tokio_io::AsyncWrite for LoggedStream<S, Formatter, Filter, L>
248{
249 fn poll_write(
250 self: Pin<&mut Self>,
251 cx: &mut Context<'_>,
252 buf: &[u8],
253 ) -> Poll<Result<usize, io::Error>> {
254 let mut_self = self.get_mut();
255 let result = Pin::new(&mut mut_self.inner_stream).poll_write(cx, buf);
256 match &result {
257 Poll::Ready(Ok(length)) => {
258 let record = Record::new(
259 RecordKind::Write,
260 mut_self.formatter.format_buffer(&buf[0..*length]),
261 );
262 if mut_self.filter.check(&record) {
263 mut_self.logger.log(record);
264 }
265 }
266 Poll::Ready(Err(e)) => mut_self.logger.log(Record::new(
267 RecordKind::Error,
268 format!("Error during async write: {e}"),
269 )),
270 Poll::Pending => {}
271 }
272 result
273 }
274
275 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
276 Pin::new(&mut self.get_mut().inner_stream).poll_flush(cx)
277 }
278
279 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
280 let mut_self = self.get_mut();
281 let result = Pin::new(&mut mut_self.inner_stream).poll_shutdown(cx);
282 let record = Record::new(
283 RecordKind::Shutdown,
284 String::from("Writer shutdown request."),
285 );
286 if mut_self.filter.check(&record) {
287 mut_self.logger.log(record);
288 }
289 result
290 }
291}
292
293impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static, L: Logger + 'static> Drop
294 for LoggedStream<S, Formatter, Filter, L>
295{
296 fn drop(&mut self) {
297 let record = Record::new(RecordKind::Drop, String::from("Deallocated."));
298 if self.filter.check(&record) {
299 self.logger.log(record);
300 }
301 }
302}