1use crate::buffer_formatter::BufferFormatter;
2use crate::logger::Logger;
3use crate::record::Record;
4use crate::record::RecordKind;
5use crate::ChannelLogger;
6use crate::MemoryStorageLogger;
7use crate::RecordFilter;
8use std::collections;
9use std::fmt;
10use std::io;
11use std::pin::Pin;
12use std::sync::mpsc;
13use std::task::Context;
14use std::task::Poll;
15use tokio::io as tokio_io;
16
17pub struct LoggedStream<
58 S: 'static,
59 Formatter: 'static,
60 Filter: RecordFilter + 'static,
61 L: Logger + 'static,
62> {
63 inner_stream: S,
64 formatter: Formatter,
65 filter: Filter,
66 logger: L,
67}
68
69impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static, L: Logger + 'static>
70 LoggedStream<S, Formatter, Filter, L>
71{
72 pub fn new(stream: S, formatter: Formatter, filter: Filter, logger: L) -> Self {
74 Self {
75 inner_stream: stream,
76 formatter,
77 filter,
78 logger,
79 }
80 }
81}
82
83impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static>
84 LoggedStream<S, Formatter, Filter, MemoryStorageLogger>
85{
86 #[inline]
87 pub fn get_log_records(&self) -> collections::VecDeque<Record> {
88 self.logger.get_log_records()
89 }
90
91 #[inline]
92 pub fn clear_log_records(&mut self) {
93 self.logger.clear_log_records()
94 }
95}
96
97impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static>
98 LoggedStream<S, Formatter, Filter, ChannelLogger>
99{
100 #[inline]
101 pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<Record>> {
102 self.logger.take_receiver()
103 }
104
105 #[inline]
106 pub fn take_receiver_unchecked(&mut self) -> mpsc::Receiver<Record> {
107 self.logger.take_receiver_unchecked()
108 }
109}
110
111impl<
112 S: fmt::Debug + 'static,
113 Formatter: fmt::Debug + 'static,
114 Filter: RecordFilter + fmt::Debug + 'static,
115 L: Logger + fmt::Debug + 'static,
116 > fmt::Debug for LoggedStream<S, Formatter, Filter, L>
117{
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 f.debug_struct("LoggedStream")
120 .field("inner_stream", &self.inner_stream)
121 .field("formatter", &self.formatter)
122 .field("filter", &self.filter)
123 .field("logger", &self.logger)
124 .finish()
125 }
126}
127
128impl<
129 S: io::Read + 'static,
130 Formatter: BufferFormatter + 'static,
131 Filter: RecordFilter + 'static,
132 L: Logger + 'static,
133 > io::Read for LoggedStream<S, Formatter, Filter, L>
134{
135 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
136 let result = self.inner_stream.read(buf);
137
138 match &result {
139 Ok(length) => {
140 let record = Record::new(
141 RecordKind::Read,
142 self.formatter.format_buffer(&buf[0..*length]),
143 );
144 if self.filter.check(&record) {
145 self.logger.log(record);
146 }
147 }
148 Err(e) if matches!(e.kind(), io::ErrorKind::WouldBlock) => {}
149 Err(e) => self.logger.log(Record::new(
150 RecordKind::Error,
151 format!("Error during read: {e}"),
152 )),
153 };
154
155 result
156 }
157}
158
159impl<
160 S: tokio_io::AsyncRead + Unpin + 'static,
161 Formatter: BufferFormatter + Unpin + 'static,
162 Filter: RecordFilter + Unpin + 'static,
163 L: Logger + Unpin + 'static,
164 > tokio_io::AsyncRead for LoggedStream<S, Formatter, Filter, L>
165{
166 fn poll_read(
167 self: Pin<&mut Self>,
168 cx: &mut Context<'_>,
169 buf: &mut tokio_io::ReadBuf<'_>,
170 ) -> Poll<io::Result<()>> {
171 let mut_self = self.get_mut();
172 let length_before_read = buf.filled().len();
173 let result = Pin::new(&mut mut_self.inner_stream).poll_read(cx, buf);
174 let length_after_read = buf.filled().len();
175 let diff = length_after_read - length_before_read;
176
177 match &result {
178 Poll::Ready(Ok(())) if diff == 0 => {}
179 Poll::Ready(Ok(())) => {
180 let record = Record::new(
181 RecordKind::Read,
182 mut_self
183 .formatter
184 .format_buffer(&(buf.filled())[length_before_read..length_after_read]),
185 );
186 if mut_self.filter.check(&record) {
187 mut_self.logger.log(record);
188 }
189 }
190 Poll::Ready(Err(e)) => mut_self.logger.log(Record::new(
191 RecordKind::Error,
192 format!("Error during async read: {e}"),
193 )),
194 Poll::Pending => {}
195 }
196
197 result
198 }
199}
200
201impl<
202 S: io::Write + 'static,
203 Formatter: BufferFormatter + 'static,
204 Filter: RecordFilter + 'static,
205 L: Logger + 'static,
206 > io::Write for LoggedStream<S, Formatter, Filter, L>
207{
208 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
209 let result = self.inner_stream.write(buf);
210
211 match &result {
212 Ok(length) => {
213 let record = Record::new(
214 RecordKind::Write,
215 self.formatter.format_buffer(&buf[0..*length]),
216 );
217 if self.filter.check(&record) {
218 self.logger.log(record);
219 }
220 }
221 Err(e)
222 if matches!(
223 e.kind(),
224 io::ErrorKind::WriteZero | io::ErrorKind::WouldBlock
225 ) => {}
226 Err(e) => self.logger.log(Record::new(
227 RecordKind::Error,
228 format!("Error during write: {e}"),
229 )),
230 };
231
232 result
233 }
234
235 fn flush(&mut self) -> io::Result<()> {
236 self.inner_stream.flush()
237 }
238}
239
240impl<
241 S: tokio_io::AsyncWrite + Unpin + 'static,
242 Formatter: BufferFormatter + Unpin + 'static,
243 Filter: RecordFilter + Unpin + 'static,
244 L: Logger + Unpin + 'static,
245 > tokio_io::AsyncWrite for LoggedStream<S, Formatter, Filter, L>
246{
247 fn poll_write(
248 self: Pin<&mut Self>,
249 cx: &mut Context<'_>,
250 buf: &[u8],
251 ) -> Poll<Result<usize, io::Error>> {
252 let mut_self = self.get_mut();
253 let result = Pin::new(&mut mut_self.inner_stream).poll_write(cx, buf);
254 match &result {
255 Poll::Ready(Ok(length)) => {
256 let record = Record::new(
257 RecordKind::Write,
258 mut_self.formatter.format_buffer(&buf[0..*length]),
259 );
260 if mut_self.filter.check(&record) {
261 mut_self.logger.log(record);
262 }
263 }
264 Poll::Ready(Err(e)) => mut_self.logger.log(Record::new(
265 RecordKind::Error,
266 format!("Error during async write: {e}"),
267 )),
268 Poll::Pending => {}
269 }
270 result
271 }
272
273 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
274 Pin::new(&mut self.get_mut().inner_stream).poll_flush(cx)
275 }
276
277 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
278 let mut_self = self.get_mut();
279 let result = Pin::new(&mut mut_self.inner_stream).poll_shutdown(cx);
280 let record = Record::new(
281 RecordKind::Shutdown,
282 String::from("Writer shutdown request."),
283 );
284 if mut_self.filter.check(&record) {
285 mut_self.logger.log(record);
286 }
287 result
288 }
289}
290
291impl<S: 'static, Formatter: 'static, Filter: RecordFilter + 'static, L: Logger + 'static> Drop
292 for LoggedStream<S, Formatter, Filter, L>
293{
294 fn drop(&mut self) {
295 let record = Record::new(RecordKind::Drop, String::from("Deallocated."));
296 if self.filter.check(&record) {
297 self.logger.log(record);
298 }
299 }
300}