1use crate::buffer_formatter::BufferFormatter;
2use crate::logger::Logger;
3use crate::record::Record;
4use crate::record::RecordKind;
5use crate::ChannelLogger;
6use crate::MemoryStorageLogger;
7use crate::RecordFilter;
8use std::collections;
9use std::fmt;
10use std::io;
11use std::pin::Pin;
12use std::sync::mpsc;
13use std::task::Context;
14use std::task::Poll;
15use tokio::io as tokio_io;
16
17pub struct LoggedStream<
65 S: 'static,
66 Formatter: 'static,
67 Filter: RecordFilter + 'static,
68 L: Logger + 'static,
69> {
70 inner_stream: S,
71 formatter: Formatter,
72 filter: Filter,
73 logger: L,
74}
75
76impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static, L: Logger + 'static>
77 LoggedStream<S, Formatter, Filter, L>
78{
79 pub fn new(stream: S, formatter: Formatter, filter: Filter, logger: L) -> Self {
81 Self {
82 inner_stream: stream,
83 formatter,
84 filter,
85 logger,
86 }
87 }
88}
89
90impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static>
91 LoggedStream<S, Formatter, Filter, MemoryStorageLogger>
92{
93 #[inline]
94 pub fn get_log_records(&self) -> collections::VecDeque<Record> {
95 self.logger.get_log_records()
96 }
97
98 #[inline]
99 pub fn clear_log_records(&mut self) {
100 self.logger.clear_log_records()
101 }
102}
103
104impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static>
105 LoggedStream<S, Formatter, Filter, ChannelLogger>
106{
107 #[inline]
108 pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<Record>> {
109 self.logger.take_receiver()
110 }
111
112 #[inline]
113 pub fn take_receiver_unchecked(&mut self) -> mpsc::Receiver<Record> {
114 self.logger.take_receiver_unchecked()
115 }
116}
117
118impl<
119 S: fmt::Debug + 'static,
120 Formatter: fmt::Debug + 'static,
121 Filter: RecordFilter + fmt::Debug + 'static,
122 L: Logger + fmt::Debug + 'static,
123 > fmt::Debug for LoggedStream<S, Formatter, Filter, L>
124{
125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126 f.debug_struct("LoggedStream")
127 .field("inner_stream", &self.inner_stream)
128 .field("formatter", &self.formatter)
129 .field("filter", &self.filter)
130 .field("logger", &self.logger)
131 .finish()
132 }
133}
134
135impl<
136 S: io::Read + 'static,
137 Formatter: BufferFormatter + 'static,
138 Filter: RecordFilter + 'static,
139 L: Logger + 'static,
140 > io::Read for LoggedStream<S, Formatter, Filter, L>
141{
142 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
143 let result = self.inner_stream.read(buf);
144
145 match &result {
146 Ok(length) => {
147 let record = Record::new(
148 RecordKind::Read,
149 self.formatter.format_buffer(&buf[0..*length]),
150 );
151 if self.filter.check(&record) {
152 self.logger.log(record);
153 }
154 }
155 Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => {}
156 Err(e) => self.logger.log(Record::new(
157 RecordKind::Error,
158 format!("Error during read: {e}"),
159 )),
160 };
161
162 result
163 }
164}
165
166impl<
167 S: tokio_io::AsyncRead + Unpin + 'static,
168 Formatter: BufferFormatter + Unpin + 'static,
169 Filter: RecordFilter + Unpin + 'static,
170 L: Logger + Unpin + 'static,
171 > tokio_io::AsyncRead for LoggedStream<S, Formatter, Filter, L>
172{
173 fn poll_read(
174 self: Pin<&mut Self>,
175 cx: &mut Context<'_>,
176 buf: &mut tokio_io::ReadBuf<'_>,
177 ) -> Poll<io::Result<()>> {
178 let mut_self = self.get_mut();
179 let length_before_read = buf.filled().len();
180 let result = Pin::new(&mut mut_self.inner_stream).poll_read(cx, buf);
181 let length_after_read = buf.filled().len();
182 let diff = length_after_read - length_before_read;
183
184 match &result {
185 Poll::Ready(Ok(())) if diff == 0 => {}
186 Poll::Ready(Ok(())) => {
187 let record = Record::new(
188 RecordKind::Read,
189 mut_self
190 .formatter
191 .format_buffer(&(buf.filled())[length_before_read..length_after_read]),
192 );
193 if mut_self.filter.check(&record) {
194 mut_self.logger.log(record);
195 }
196 }
197 Poll::Ready(Err(e)) => mut_self.logger.log(Record::new(
198 RecordKind::Error,
199 format!("Error during async read: {e}"),
200 )),
201 Poll::Pending => {}
202 }
203
204 result
205 }
206}
207
208impl<
209 S: io::Write + 'static,
210 Formatter: BufferFormatter + 'static,
211 Filter: RecordFilter + 'static,
212 L: Logger + 'static,
213 > io::Write for LoggedStream<S, Formatter, Filter, L>
214{
215 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
216 let result = self.inner_stream.write(buf);
217
218 match &result {
219 Ok(length) => {
220 let record = Record::new(
221 RecordKind::Write,
222 self.formatter.format_buffer(&buf[0..*length]),
223 );
224 if self.filter.check(&record) {
225 self.logger.log(record);
226 }
227 }
228 Err(e)
229 if matches!(
230 e.kind(),
231 io::ErrorKind::WriteZero | io::ErrorKind::WouldBlock
232 ) => {}
233 Err(e) => self.logger.log(Record::new(
234 RecordKind::Error,
235 format!("Error during write: {e}"),
236 )),
237 };
238
239 result
240 }
241
242 fn flush(&mut self) -> io::Result<()> {
243 self.inner_stream.flush()
244 }
245}
246
247impl<
248 S: tokio_io::AsyncWrite + Unpin + 'static,
249 Formatter: BufferFormatter + Unpin + 'static,
250 Filter: RecordFilter + Unpin + 'static,
251 L: Logger + Unpin + 'static,
252 > tokio_io::AsyncWrite for LoggedStream<S, Formatter, Filter, L>
253{
254 fn poll_write(
255 self: Pin<&mut Self>,
256 cx: &mut Context<'_>,
257 buf: &[u8],
258 ) -> Poll<Result<usize, io::Error>> {
259 let mut_self = self.get_mut();
260 let result = Pin::new(&mut mut_self.inner_stream).poll_write(cx, buf);
261 match &result {
262 Poll::Ready(Ok(length)) => {
263 let record = Record::new(
264 RecordKind::Write,
265 mut_self.formatter.format_buffer(&buf[0..*length]),
266 );
267 if mut_self.filter.check(&record) {
268 mut_self.logger.log(record);
269 }
270 }
271 Poll::Ready(Err(e)) => mut_self.logger.log(Record::new(
272 RecordKind::Error,
273 format!("Error during async write: {e}"),
274 )),
275 Poll::Pending => {}
276 }
277 result
278 }
279
280 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
281 Pin::new(&mut self.get_mut().inner_stream).poll_flush(cx)
282 }
283
284 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
285 let mut_self = self.get_mut();
286 let result = Pin::new(&mut mut_self.inner_stream).poll_shutdown(cx);
287 let record = Record::new(
288 RecordKind::Shutdown,
289 String::from("Writer shutdown request."),
290 );
291 if mut_self.filter.check(&record) {
292 mut_self.logger.log(record);
293 }
294 result
295 }
296}
297
298impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static, L: Logger + 'static> Drop
299 for LoggedStream<S, Formatter, Filter, L>
300{
301 fn drop(&mut self) {
302 let record = Record::new(RecordKind::Drop, String::from("Deallocated."));
303 if self.filter.check(&record) {
304 self.logger.log(record);
305 }
306 }
307}