iroh_io/
stats.rs

1//! Utilities for measuring time for io ops.
2//!
3//! This is useful for debugging performance issues and for io bookkeeping.
4//! These wrapper do not distinguish between successful and failed operations.
5//! The expectation is that failed operations will be counted separately.
6//!
7//! Statistics are always added using saturating arithmetic, so there can't be
8//! a panic even in unlikely scenarios.
9use std::{
10    future::Future,
11    io,
12    pin::Pin,
13    task::{Context, Poll},
14    time::Duration,
15};
16
17use bytes::Bytes;
18use pin_project::pin_project;
19
20use crate::{AsyncSliceReader, AsyncSliceWriter, AsyncStreamReader, AsyncStreamWriter};
21
22/// Statistics about a tracked operation.
23#[derive(Debug, Clone, Copy, Default)]
24pub struct Stats {
25    /// The number of times the operation was called.
26    pub count: u64,
27    /// The total time spent in the operation.
28    pub duration: Duration,
29}
30
31impl std::ops::Add<Stats> for Stats {
32    type Output = Stats;
33
34    fn add(self, rhs: Stats) -> Self::Output {
35        Self {
36            count: self.count.saturating_add(rhs.count),
37            duration: self.duration.saturating_add(rhs.duration),
38        }
39    }
40}
41
42impl std::ops::AddAssign<Stats> for Stats {
43    fn add_assign(&mut self, rhs: Stats) {
44        *self = *self + rhs;
45    }
46}
47
48/// Statistics about a tracked operation.
49#[derive(Debug, Clone, Copy, Default)]
50pub struct SizeAndStats {
51    /// The number of bytes read or written.
52    pub size: u64,
53    /// Statistics about the operation.
54    pub stats: Stats,
55}
56
57impl From<Stats> for SizeAndStats {
58    fn from(stats: Stats) -> Self {
59        Self { size: 0, stats }
60    }
61}
62
63impl std::ops::Add<SizeAndStats> for SizeAndStats {
64    type Output = SizeAndStats;
65
66    fn add(self, rhs: SizeAndStats) -> Self::Output {
67        Self {
68            size: self.size.saturating_add(rhs.size),
69            stats: self.stats + rhs.stats,
70        }
71    }
72}
73
74impl std::ops::AddAssign<SizeAndStats> for SizeAndStats {
75    fn add_assign(&mut self, rhs: SizeAndStats) {
76        *self = *self + rhs;
77    }
78}
79
80/// Statistics about a stream writer.
81#[derive(Debug, Clone, Copy, Default)]
82pub struct StreamWriterStats {
83    /// Statistics about the `write` operation.
84    pub write: SizeAndStats,
85    /// Statistics about the `write_bytes` operation.
86    pub write_bytes: SizeAndStats,
87    /// Statistics about the `sync` operation.
88    pub sync: Stats,
89}
90
91impl StreamWriterStats {
92    /// Gives the total stats for this writer.
93    ///
94    /// This adds the count and duration from all three operations,
95    /// and the total number of bytes written from write and wite_bytes.
96    ///
97    /// It is important to also add the sync stats, because some buffered
98    /// writers will do most actual io in sync.
99    pub fn total(&self) -> SizeAndStats {
100        self.write + self.write_bytes + self.sync.into()
101    }
102}
103
104impl std::ops::Add<StreamWriterStats> for StreamWriterStats {
105    type Output = StreamWriterStats;
106
107    fn add(self, rhs: StreamWriterStats) -> Self::Output {
108        Self {
109            write: self.write + rhs.write,
110            write_bytes: self.write_bytes + rhs.write_bytes,
111            sync: self.sync + rhs.sync,
112        }
113    }
114}
115
116impl std::ops::AddAssign<StreamWriterStats> for StreamWriterStats {
117    fn add_assign(&mut self, rhs: StreamWriterStats) {
118        *self = *self + rhs;
119    }
120}
121
122/// A stream writer that tracks the time spent in write operations.
123#[derive(Debug, Clone)]
124pub struct TrackingStreamWriter<W> {
125    inner: W,
126    /// Statistics about the write operations.
127    stats: StreamWriterStats,
128}
129
130impl<W> TrackingStreamWriter<W> {
131    /// Create a new `TrackingStreamWriter`.
132    pub fn new(inner: W) -> Self {
133        Self {
134            inner,
135            stats: Default::default(),
136        }
137    }
138
139    /// Get the statistics about the write operations.
140    pub fn stats(&self) -> StreamWriterStats {
141        self.stats
142    }
143}
144
145impl<W: AsyncStreamWriter> AsyncStreamWriter for TrackingStreamWriter<W> {
146    async fn write(&mut self, data: &[u8]) -> io::Result<()> {
147        // increase the size by the length of the data, even if the write fails
148        self.stats.write.size = self.stats.write.size.saturating_add(data.len() as u64);
149        AggregateStats::new(self.inner.write(data), &mut self.stats.write.stats).await
150    }
151
152    async fn write_bytes(&mut self, data: bytes::Bytes) -> io::Result<()> {
153        // increase the size by the length of the data, even if the write fails
154        self.stats.write_bytes.size = self
155            .stats
156            .write_bytes
157            .size
158            .saturating_add(data.len() as u64);
159        AggregateStats::new(
160            self.inner.write_bytes(data),
161            &mut self.stats.write_bytes.stats,
162        )
163        .await
164    }
165
166    async fn sync(&mut self) -> io::Result<()> {
167        AggregateStats::new(self.inner.sync(), &mut self.stats.sync).await
168    }
169}
170
171/// Statistics about a stream writer.
172#[derive(Debug, Clone, Copy, Default)]
173pub struct StreamReaderStats {
174    /// Statistics about the `read` operation.
175    pub read: SizeAndStats,
176}
177
178impl StreamReaderStats {
179    /// Gives the total stats for this reader.
180    pub fn total(&self) -> SizeAndStats {
181        self.read
182    }
183}
184
185impl std::ops::Add<StreamReaderStats> for StreamReaderStats {
186    type Output = StreamReaderStats;
187
188    fn add(self, rhs: StreamReaderStats) -> Self::Output {
189        Self {
190            read: self.read + rhs.read,
191        }
192    }
193}
194
195impl std::ops::AddAssign<StreamReaderStats> for StreamReaderStats {
196    fn add_assign(&mut self, rhs: StreamReaderStats) {
197        *self = *self + rhs;
198    }
199}
200
201/// A stream writer that tracks the time spent in write operations.
202#[derive(Debug, Clone)]
203pub struct TrackingStreamReader<W> {
204    inner: W,
205    /// Statistics about the write operations.
206    stats: StreamReaderStats,
207}
208
209impl<W> TrackingStreamReader<W> {
210    /// Create a new `TrackingStreamWriter`.
211    pub fn new(inner: W) -> Self {
212        Self {
213            inner,
214            stats: Default::default(),
215        }
216    }
217
218    /// Get the statistics about the write operations.
219    pub fn stats(&self) -> StreamReaderStats {
220        self.stats
221    }
222}
223
224impl<W: AsyncStreamReader> AsyncStreamReader for TrackingStreamReader<W> {
225    async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
226        AggregateSizeAndStats::new(self.inner.read_bytes(len), &mut self.stats.read).await
227    }
228
229    async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
230        AggregateSizeAndStats::new(self.inner.read(), &mut self.stats.read).await
231    }
232}
233
234/// Statistics about a slice reader.
235#[derive(Debug, Clone, Copy, Default)]
236pub struct SliceReaderStats {
237    /// Statistics about the `read_at` operation.
238    pub read_at: SizeAndStats,
239    /// Statistics about the `len` operation.
240    pub len: Stats,
241}
242
243impl SliceReaderStats {
244    /// Gives the total stats for this reader.
245    pub fn total(&self) -> SizeAndStats {
246        self.read_at + self.len.into()
247    }
248}
249
250impl std::ops::Add<SliceReaderStats> for SliceReaderStats {
251    type Output = SliceReaderStats;
252
253    fn add(self, rhs: SliceReaderStats) -> Self::Output {
254        Self {
255            read_at: self.read_at + rhs.read_at,
256            len: self.len + rhs.len,
257        }
258    }
259}
260
261impl std::ops::AddAssign<SliceReaderStats> for SliceReaderStats {
262    fn add_assign(&mut self, rhs: SliceReaderStats) {
263        *self = *self + rhs;
264    }
265}
266
267/// A slice reader that tracks the time spent in read operations.
268#[derive(Debug, Clone)]
269pub struct TrackingSliceReader<R> {
270    inner: R,
271    /// Statistics about the read operations.
272    stats: SliceReaderStats,
273}
274
275impl<R: AsyncSliceReader> TrackingSliceReader<R> {
276    /// Create a new `TrackingSliceReader`.
277    pub fn new(inner: R) -> Self {
278        Self {
279            inner,
280            stats: Default::default(),
281        }
282    }
283
284    /// Get the statistics about the read operations.
285    pub fn stats(&self) -> SliceReaderStats {
286        self.stats
287    }
288}
289
290impl<R: AsyncSliceReader> AsyncSliceReader for TrackingSliceReader<R> {
291    async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
292        AggregateSizeAndStats::new(self.inner.read_at(offset, len), &mut self.stats.read_at).await
293    }
294
295    async fn size(&mut self) -> io::Result<u64> {
296        AggregateStats::new(self.inner.size(), &mut self.stats.len).await
297    }
298}
299
300/// Statistics about a slice writer.
301#[derive(Debug, Clone, Copy, Default)]
302pub struct SliceWriterStats {
303    /// Statistics about the `write_at` operation.
304    pub write_at: SizeAndStats,
305    /// Statistics about the `write_bytes_at` operation.
306    pub write_bytes_at: SizeAndStats,
307    /// Statistics about the `set_len` operation.
308    pub set_len: Stats,
309    /// Statistics about the `sync` operation.
310    pub sync: Stats,
311}
312
313impl SliceWriterStats {
314    /// Gives the total stats for this writer.
315    pub fn total(&self) -> SizeAndStats {
316        self.write_at + self.write_bytes_at + self.set_len.into() + self.sync.into()
317    }
318}
319
320impl std::ops::Add<SliceWriterStats> for SliceWriterStats {
321    type Output = SliceWriterStats;
322
323    fn add(self, rhs: SliceWriterStats) -> Self::Output {
324        Self {
325            write_at: self.write_at + rhs.write_at,
326            write_bytes_at: self.write_bytes_at + rhs.write_bytes_at,
327            set_len: self.set_len + rhs.set_len,
328            sync: self.sync + rhs.sync,
329        }
330    }
331}
332
333impl std::ops::AddAssign<SliceWriterStats> for SliceWriterStats {
334    fn add_assign(&mut self, rhs: SliceWriterStats) {
335        *self = *self + rhs;
336    }
337}
338
339/// A slice writer that tracks the time spent in write operations.
340#[derive(Debug, Clone)]
341pub struct TrackingSliceWriter<W> {
342    inner: W,
343    /// Statistics about the write operations.
344    stats: SliceWriterStats,
345}
346
347impl<W> TrackingSliceWriter<W> {
348    /// Create a new `TrackingSliceWriter`.
349    pub fn new(inner: W) -> Self {
350        Self {
351            inner,
352            stats: Default::default(),
353        }
354    }
355
356    /// Get the statistics about the write operations.
357    pub fn stats(&self) -> SliceWriterStats {
358        self.stats
359    }
360}
361
362impl<W: AsyncSliceWriter> AsyncSliceWriter for TrackingSliceWriter<W> {
363    async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> {
364        // increase the size by the length of the data, even if the write fails
365        self.stats.write_at.size = self.stats.write_at.size.saturating_add(data.len() as u64);
366        AggregateStats::new(
367            self.inner.write_at(offset, data),
368            &mut self.stats.write_at.stats,
369        )
370        .await
371    }
372
373    async fn write_bytes_at(&mut self, offset: u64, data: bytes::Bytes) -> io::Result<()> {
374        // increase the size by the length of the data, even if the write fails
375        self.stats.write_bytes_at.size = self
376            .stats
377            .write_bytes_at
378            .size
379            .saturating_add(data.len() as u64);
380        AggregateStats::new(
381            self.inner.write_bytes_at(offset, data),
382            &mut self.stats.write_bytes_at.stats,
383        )
384        .await
385    }
386
387    async fn set_len(&mut self, len: u64) -> io::Result<()> {
388        AggregateStats::new(self.inner.set_len(len), &mut self.stats.set_len).await
389    }
390
391    async fn sync(&mut self) -> io::Result<()> {
392        AggregateStats::new(self.inner.sync(), &mut self.stats.sync).await
393    }
394}
395
396/// A future that measures the time spent until it is ready.
397#[pin_project]
398#[derive(Debug)]
399pub struct AggregateStats<'a, F> {
400    #[pin]
401    inner: F,
402    start: std::time::Instant,
403    target: &'a mut Stats,
404}
405
406impl<'a, F: Future> AggregateStats<'a, F> {
407    /// Create a new `WriteTiming` future.
408    pub fn new(inner: F, target: &'a mut Stats) -> Self {
409        Self {
410            inner,
411            target,
412            start: std::time::Instant::now(),
413        }
414    }
415}
416
417impl<F: Future> Future for AggregateStats<'_, F> {
418    type Output = F::Output;
419
420    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
421        let p = self.project();
422        match p.inner.poll(cx) {
423            Poll::Ready(x) => {
424                p.target.duration = p.target.duration.saturating_add(p.start.elapsed());
425                p.target.count = p.target.count.saturating_add(1);
426                Poll::Ready(x)
427            }
428            Poll::Pending => Poll::Pending,
429        }
430    }
431}
432
433/// A future that measures the time spent until it is ready, and the size of the result.
434#[pin_project]
435#[derive(Debug)]
436pub struct AggregateSizeAndStats<'a, F> {
437    #[pin]
438    inner: F,
439    start: std::time::Instant,
440    target: &'a mut SizeAndStats,
441}
442
443impl<'a, F: Future> AggregateSizeAndStats<'a, F> {
444    /// Create a new `WriteTiming` future.
445    pub fn new(inner: F, target: &'a mut SizeAndStats) -> Self {
446        Self {
447            inner,
448            target,
449            start: std::time::Instant::now(),
450        }
451    }
452}
453
454/// A trait for types that may have a size.
455pub trait ReadResult {
456    /// Get the size of the value, if known.
457    fn size(&self) -> Option<u64>;
458}
459
460impl<T: AsRef<[u8]>> ReadResult for std::io::Result<T> {
461    fn size(&self) -> Option<u64> {
462        match self {
463            Ok(x) => Some(x.as_ref().len() as u64),
464            Err(_) => None,
465        }
466    }
467}
468
469impl<F: Future> Future for AggregateSizeAndStats<'_, F>
470where
471    F::Output: ReadResult,
472{
473    type Output = F::Output;
474
475    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
476        let p = self.project();
477        match p.inner.poll(cx) {
478            Poll::Ready(x) => {
479                p.target.stats.duration = p.target.stats.duration.saturating_add(p.start.elapsed());
480                p.target.stats.count = p.target.stats.count.saturating_add(1);
481                if let Some(size) = x.size() {
482                    // increase the size by the length of the data, if known
483                    // e.g. an error will not have a size
484                    p.target.size = p.target.size.saturating_add(size);
485                }
486                Poll::Ready(x)
487            }
488            Poll::Pending => Poll::Pending,
489        }
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496
497    #[tokio::test]
498    async fn tracking_stream_writer() {
499        let mut writer = TrackingStreamWriter::new(Vec::<u8>::new());
500        writer.write(&[1, 2, 3]).await.unwrap();
501        writer.write(&[1, 2, 3]).await.unwrap();
502        writer.write_bytes(vec![1, 2, 3].into()).await.unwrap();
503        writer.sync().await.unwrap();
504        assert_eq!(writer.stats().write.size, 6);
505        assert_eq!(writer.stats().write.stats.count, 2);
506        assert_eq!(writer.stats().write_bytes.size, 3);
507        assert_eq!(writer.stats().write_bytes.stats.count, 1);
508        assert_eq!(writer.stats().sync.count, 1);
509    }
510
511    #[tokio::test]
512    async fn tracking_stream_reader() {
513        let mut writer = TrackingStreamReader::new(Bytes::from(vec![0, 1, 2, 3]));
514        writer.read_bytes(2).await.unwrap();
515        writer.read_bytes(3).await.unwrap();
516        assert_eq!(writer.stats().read.size, 4); // not 5, because the last read was only 2 bytes
517        assert_eq!(writer.stats().read.stats.count, 2);
518    }
519
520    #[tokio::test]
521    async fn tracking_slice_writer() {
522        let mut writer = TrackingSliceWriter::new(Vec::new());
523        writer.write_at(0, &[1, 2, 3]).await.unwrap();
524        writer.write_at(10, &[1, 2, 3]).await.unwrap();
525        writer
526            .write_bytes_at(20, vec![1, 2, 3].into())
527            .await
528            .unwrap();
529        writer.sync().await.unwrap();
530        writer.set_len(0).await.unwrap();
531        assert_eq!(writer.stats().write_at.size, 6);
532        assert_eq!(writer.stats().write_at.stats.count, 2);
533        assert_eq!(writer.stats().write_bytes_at.size, 3);
534        assert_eq!(writer.stats().write_bytes_at.stats.count, 1);
535        assert_eq!(writer.stats().set_len.count, 1);
536        assert_eq!(writer.stats().sync.count, 1);
537    }
538
539    #[tokio::test]
540    async fn tracking_slice_reader() {
541        let mut reader = TrackingSliceReader::new(Bytes::from(vec![1u8, 2, 3]));
542        let _ = reader.read_at(0, 1).await.unwrap();
543        let _ = reader.read_at(10, 1).await.unwrap();
544        let _ = reader.size().await.unwrap();
545        assert_eq!(reader.stats().read_at.size, 1);
546        assert_eq!(reader.stats().read_at.stats.count, 2);
547        assert_eq!(reader.stats().len.count, 1);
548    }
549}