1use std::{
10 future::Future,
11 io,
12 pin::Pin,
13 task::{Context, Poll},
14 time::Duration,
15};
16
17use bytes::Bytes;
18use pin_project::pin_project;
19
20use crate::{AsyncSliceReader, AsyncSliceWriter, AsyncStreamReader, AsyncStreamWriter};
21
22#[derive(Debug, Clone, Copy, Default)]
24pub struct Stats {
25 pub count: u64,
27 pub duration: Duration,
29}
30
31impl std::ops::Add<Stats> for Stats {
32 type Output = Stats;
33
34 fn add(self, rhs: Stats) -> Self::Output {
35 Self {
36 count: self.count.saturating_add(rhs.count),
37 duration: self.duration.saturating_add(rhs.duration),
38 }
39 }
40}
41
42impl std::ops::AddAssign<Stats> for Stats {
43 fn add_assign(&mut self, rhs: Stats) {
44 *self = *self + rhs;
45 }
46}
47
48#[derive(Debug, Clone, Copy, Default)]
50pub struct SizeAndStats {
51 pub size: u64,
53 pub stats: Stats,
55}
56
57impl From<Stats> for SizeAndStats {
58 fn from(stats: Stats) -> Self {
59 Self { size: 0, stats }
60 }
61}
62
63impl std::ops::Add<SizeAndStats> for SizeAndStats {
64 type Output = SizeAndStats;
65
66 fn add(self, rhs: SizeAndStats) -> Self::Output {
67 Self {
68 size: self.size.saturating_add(rhs.size),
69 stats: self.stats + rhs.stats,
70 }
71 }
72}
73
74impl std::ops::AddAssign<SizeAndStats> for SizeAndStats {
75 fn add_assign(&mut self, rhs: SizeAndStats) {
76 *self = *self + rhs;
77 }
78}
79
80#[derive(Debug, Clone, Copy, Default)]
82pub struct StreamWriterStats {
83 pub write: SizeAndStats,
85 pub write_bytes: SizeAndStats,
87 pub sync: Stats,
89}
90
91impl StreamWriterStats {
92 pub fn total(&self) -> SizeAndStats {
100 self.write + self.write_bytes + self.sync.into()
101 }
102}
103
104impl std::ops::Add<StreamWriterStats> for StreamWriterStats {
105 type Output = StreamWriterStats;
106
107 fn add(self, rhs: StreamWriterStats) -> Self::Output {
108 Self {
109 write: self.write + rhs.write,
110 write_bytes: self.write_bytes + rhs.write_bytes,
111 sync: self.sync + rhs.sync,
112 }
113 }
114}
115
116impl std::ops::AddAssign<StreamWriterStats> for StreamWriterStats {
117 fn add_assign(&mut self, rhs: StreamWriterStats) {
118 *self = *self + rhs;
119 }
120}
121
122#[derive(Debug, Clone)]
124pub struct TrackingStreamWriter<W> {
125 inner: W,
126 stats: StreamWriterStats,
128}
129
130impl<W> TrackingStreamWriter<W> {
131 pub fn new(inner: W) -> Self {
133 Self {
134 inner,
135 stats: Default::default(),
136 }
137 }
138
139 pub fn stats(&self) -> StreamWriterStats {
141 self.stats
142 }
143}
144
145impl<W: AsyncStreamWriter> AsyncStreamWriter for TrackingStreamWriter<W> {
146 async fn write(&mut self, data: &[u8]) -> io::Result<()> {
147 self.stats.write.size = self.stats.write.size.saturating_add(data.len() as u64);
149 AggregateStats::new(self.inner.write(data), &mut self.stats.write.stats).await
150 }
151
152 async fn write_bytes(&mut self, data: bytes::Bytes) -> io::Result<()> {
153 self.stats.write_bytes.size = self
155 .stats
156 .write_bytes
157 .size
158 .saturating_add(data.len() as u64);
159 AggregateStats::new(
160 self.inner.write_bytes(data),
161 &mut self.stats.write_bytes.stats,
162 )
163 .await
164 }
165
166 async fn sync(&mut self) -> io::Result<()> {
167 AggregateStats::new(self.inner.sync(), &mut self.stats.sync).await
168 }
169}
170
171#[derive(Debug, Clone, Copy, Default)]
173pub struct StreamReaderStats {
174 pub read: SizeAndStats,
176}
177
178impl StreamReaderStats {
179 pub fn total(&self) -> SizeAndStats {
181 self.read
182 }
183}
184
185impl std::ops::Add<StreamReaderStats> for StreamReaderStats {
186 type Output = StreamReaderStats;
187
188 fn add(self, rhs: StreamReaderStats) -> Self::Output {
189 Self {
190 read: self.read + rhs.read,
191 }
192 }
193}
194
195impl std::ops::AddAssign<StreamReaderStats> for StreamReaderStats {
196 fn add_assign(&mut self, rhs: StreamReaderStats) {
197 *self = *self + rhs;
198 }
199}
200
201#[derive(Debug, Clone)]
203pub struct TrackingStreamReader<W> {
204 inner: W,
205 stats: StreamReaderStats,
207}
208
209impl<W> TrackingStreamReader<W> {
210 pub fn new(inner: W) -> Self {
212 Self {
213 inner,
214 stats: Default::default(),
215 }
216 }
217
218 pub fn stats(&self) -> StreamReaderStats {
220 self.stats
221 }
222}
223
224impl<W: AsyncStreamReader> AsyncStreamReader for TrackingStreamReader<W> {
225 async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
226 AggregateSizeAndStats::new(self.inner.read_bytes(len), &mut self.stats.read).await
227 }
228
229 async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
230 AggregateSizeAndStats::new(self.inner.read(), &mut self.stats.read).await
231 }
232}
233
234#[derive(Debug, Clone, Copy, Default)]
236pub struct SliceReaderStats {
237 pub read_at: SizeAndStats,
239 pub len: Stats,
241}
242
243impl SliceReaderStats {
244 pub fn total(&self) -> SizeAndStats {
246 self.read_at + self.len.into()
247 }
248}
249
250impl std::ops::Add<SliceReaderStats> for SliceReaderStats {
251 type Output = SliceReaderStats;
252
253 fn add(self, rhs: SliceReaderStats) -> Self::Output {
254 Self {
255 read_at: self.read_at + rhs.read_at,
256 len: self.len + rhs.len,
257 }
258 }
259}
260
261impl std::ops::AddAssign<SliceReaderStats> for SliceReaderStats {
262 fn add_assign(&mut self, rhs: SliceReaderStats) {
263 *self = *self + rhs;
264 }
265}
266
267#[derive(Debug, Clone)]
269pub struct TrackingSliceReader<R> {
270 inner: R,
271 stats: SliceReaderStats,
273}
274
275impl<R: AsyncSliceReader> TrackingSliceReader<R> {
276 pub fn new(inner: R) -> Self {
278 Self {
279 inner,
280 stats: Default::default(),
281 }
282 }
283
284 pub fn stats(&self) -> SliceReaderStats {
286 self.stats
287 }
288}
289
290impl<R: AsyncSliceReader> AsyncSliceReader for TrackingSliceReader<R> {
291 async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
292 AggregateSizeAndStats::new(self.inner.read_at(offset, len), &mut self.stats.read_at).await
293 }
294
295 async fn size(&mut self) -> io::Result<u64> {
296 AggregateStats::new(self.inner.size(), &mut self.stats.len).await
297 }
298}
299
300#[derive(Debug, Clone, Copy, Default)]
302pub struct SliceWriterStats {
303 pub write_at: SizeAndStats,
305 pub write_bytes_at: SizeAndStats,
307 pub set_len: Stats,
309 pub sync: Stats,
311}
312
313impl SliceWriterStats {
314 pub fn total(&self) -> SizeAndStats {
316 self.write_at + self.write_bytes_at + self.set_len.into() + self.sync.into()
317 }
318}
319
320impl std::ops::Add<SliceWriterStats> for SliceWriterStats {
321 type Output = SliceWriterStats;
322
323 fn add(self, rhs: SliceWriterStats) -> Self::Output {
324 Self {
325 write_at: self.write_at + rhs.write_at,
326 write_bytes_at: self.write_bytes_at + rhs.write_bytes_at,
327 set_len: self.set_len + rhs.set_len,
328 sync: self.sync + rhs.sync,
329 }
330 }
331}
332
333impl std::ops::AddAssign<SliceWriterStats> for SliceWriterStats {
334 fn add_assign(&mut self, rhs: SliceWriterStats) {
335 *self = *self + rhs;
336 }
337}
338
339#[derive(Debug, Clone)]
341pub struct TrackingSliceWriter<W> {
342 inner: W,
343 stats: SliceWriterStats,
345}
346
347impl<W> TrackingSliceWriter<W> {
348 pub fn new(inner: W) -> Self {
350 Self {
351 inner,
352 stats: Default::default(),
353 }
354 }
355
356 pub fn stats(&self) -> SliceWriterStats {
358 self.stats
359 }
360}
361
362impl<W: AsyncSliceWriter> AsyncSliceWriter for TrackingSliceWriter<W> {
363 async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> {
364 self.stats.write_at.size = self.stats.write_at.size.saturating_add(data.len() as u64);
366 AggregateStats::new(
367 self.inner.write_at(offset, data),
368 &mut self.stats.write_at.stats,
369 )
370 .await
371 }
372
373 async fn write_bytes_at(&mut self, offset: u64, data: bytes::Bytes) -> io::Result<()> {
374 self.stats.write_bytes_at.size = self
376 .stats
377 .write_bytes_at
378 .size
379 .saturating_add(data.len() as u64);
380 AggregateStats::new(
381 self.inner.write_bytes_at(offset, data),
382 &mut self.stats.write_bytes_at.stats,
383 )
384 .await
385 }
386
387 async fn set_len(&mut self, len: u64) -> io::Result<()> {
388 AggregateStats::new(self.inner.set_len(len), &mut self.stats.set_len).await
389 }
390
391 async fn sync(&mut self) -> io::Result<()> {
392 AggregateStats::new(self.inner.sync(), &mut self.stats.sync).await
393 }
394}
395
396#[pin_project]
398#[derive(Debug)]
399pub struct AggregateStats<'a, F> {
400 #[pin]
401 inner: F,
402 start: std::time::Instant,
403 target: &'a mut Stats,
404}
405
406impl<'a, F: Future> AggregateStats<'a, F> {
407 pub fn new(inner: F, target: &'a mut Stats) -> Self {
409 Self {
410 inner,
411 target,
412 start: std::time::Instant::now(),
413 }
414 }
415}
416
417impl<F: Future> Future for AggregateStats<'_, F> {
418 type Output = F::Output;
419
420 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
421 let p = self.project();
422 match p.inner.poll(cx) {
423 Poll::Ready(x) => {
424 p.target.duration = p.target.duration.saturating_add(p.start.elapsed());
425 p.target.count = p.target.count.saturating_add(1);
426 Poll::Ready(x)
427 }
428 Poll::Pending => Poll::Pending,
429 }
430 }
431}
432
433#[pin_project]
435#[derive(Debug)]
436pub struct AggregateSizeAndStats<'a, F> {
437 #[pin]
438 inner: F,
439 start: std::time::Instant,
440 target: &'a mut SizeAndStats,
441}
442
443impl<'a, F: Future> AggregateSizeAndStats<'a, F> {
444 pub fn new(inner: F, target: &'a mut SizeAndStats) -> Self {
446 Self {
447 inner,
448 target,
449 start: std::time::Instant::now(),
450 }
451 }
452}
453
454pub trait ReadResult {
456 fn size(&self) -> Option<u64>;
458}
459
460impl<T: AsRef<[u8]>> ReadResult for std::io::Result<T> {
461 fn size(&self) -> Option<u64> {
462 match self {
463 Ok(x) => Some(x.as_ref().len() as u64),
464 Err(_) => None,
465 }
466 }
467}
468
469impl<F: Future> Future for AggregateSizeAndStats<'_, F>
470where
471 F::Output: ReadResult,
472{
473 type Output = F::Output;
474
475 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
476 let p = self.project();
477 match p.inner.poll(cx) {
478 Poll::Ready(x) => {
479 p.target.stats.duration = p.target.stats.duration.saturating_add(p.start.elapsed());
480 p.target.stats.count = p.target.stats.count.saturating_add(1);
481 if let Some(size) = x.size() {
482 p.target.size = p.target.size.saturating_add(size);
485 }
486 Poll::Ready(x)
487 }
488 Poll::Pending => Poll::Pending,
489 }
490 }
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496
497 #[tokio::test]
498 async fn tracking_stream_writer() {
499 let mut writer = TrackingStreamWriter::new(Vec::<u8>::new());
500 writer.write(&[1, 2, 3]).await.unwrap();
501 writer.write(&[1, 2, 3]).await.unwrap();
502 writer.write_bytes(vec![1, 2, 3].into()).await.unwrap();
503 writer.sync().await.unwrap();
504 assert_eq!(writer.stats().write.size, 6);
505 assert_eq!(writer.stats().write.stats.count, 2);
506 assert_eq!(writer.stats().write_bytes.size, 3);
507 assert_eq!(writer.stats().write_bytes.stats.count, 1);
508 assert_eq!(writer.stats().sync.count, 1);
509 }
510
511 #[tokio::test]
512 async fn tracking_stream_reader() {
513 let mut writer = TrackingStreamReader::new(Bytes::from(vec![0, 1, 2, 3]));
514 writer.read_bytes(2).await.unwrap();
515 writer.read_bytes(3).await.unwrap();
516 assert_eq!(writer.stats().read.size, 4); assert_eq!(writer.stats().read.stats.count, 2);
518 }
519
520 #[tokio::test]
521 async fn tracking_slice_writer() {
522 let mut writer = TrackingSliceWriter::new(Vec::new());
523 writer.write_at(0, &[1, 2, 3]).await.unwrap();
524 writer.write_at(10, &[1, 2, 3]).await.unwrap();
525 writer
526 .write_bytes_at(20, vec![1, 2, 3].into())
527 .await
528 .unwrap();
529 writer.sync().await.unwrap();
530 writer.set_len(0).await.unwrap();
531 assert_eq!(writer.stats().write_at.size, 6);
532 assert_eq!(writer.stats().write_at.stats.count, 2);
533 assert_eq!(writer.stats().write_bytes_at.size, 3);
534 assert_eq!(writer.stats().write_bytes_at.stats.count, 1);
535 assert_eq!(writer.stats().set_len.count, 1);
536 assert_eq!(writer.stats().sync.count, 1);
537 }
538
539 #[tokio::test]
540 async fn tracking_slice_reader() {
541 let mut reader = TrackingSliceReader::new(Bytes::from(vec![1u8, 2, 3]));
542 let _ = reader.read_at(0, 1).await.unwrap();
543 let _ = reader.read_at(10, 1).await.unwrap();
544 let _ = reader.size().await.unwrap();
545 assert_eq!(reader.stats().read_at.size, 1);
546 assert_eq!(reader.stats().read_at.stats.count, 2);
547 assert_eq!(reader.stats().len.count, 1);
548 }
549}