iroh_blobs/api/blobs/
reader.rs

1use std::{
2    io::{self, ErrorKind, SeekFrom},
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use n0_future::StreamExt;
8
9use crate::{
10    api::{
11        blobs::{Blobs, ReaderOptions},
12        proto::ExportRangesItem,
13    },
14    Hash,
15};
16
17/// A reader for blobs that implements `AsyncRead` and `AsyncSeek`.
18#[derive(Debug)]
19pub struct BlobReader {
20    blobs: Blobs,
21    options: ReaderOptions,
22    state: ReaderState,
23}
24
25#[derive(Default, derive_more::Debug)]
26enum ReaderState {
27    Idle {
28        position: u64,
29    },
30    Seeking {
31        position: u64,
32    },
33    Reading {
34        position: u64,
35        #[debug(skip)]
36        op: n0_future::boxed::BoxStream<ExportRangesItem>,
37    },
38    #[default]
39    Poisoned,
40}
41
42impl BlobReader {
43    pub(super) fn new(blobs: Blobs, options: ReaderOptions) -> Self {
44        Self {
45            blobs,
46            options,
47            state: ReaderState::Idle { position: 0 },
48        }
49    }
50
51    pub fn hash(&self) -> &Hash {
52        &self.options.hash
53    }
54}
55
56impl tokio::io::AsyncRead for BlobReader {
57    fn poll_read(
58        self: Pin<&mut Self>,
59        cx: &mut Context<'_>,
60        buf: &mut tokio::io::ReadBuf<'_>,
61    ) -> Poll<io::Result<()>> {
62        let this = self.get_mut();
63        let mut position1 = None;
64        loop {
65            let guard = &mut this.state;
66            match std::mem::take(guard) {
67                ReaderState::Idle { position } => {
68                    // todo: read until next page boundary instead of fixed size
69                    let len = buf.remaining() as u64;
70                    let end = position.checked_add(len).ok_or_else(|| {
71                        io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading")
72                    })?;
73                    // start the export op for the entire size of the buffer, and convert to a stream
74                    let stream = this
75                        .blobs
76                        .export_ranges(this.options.hash, position..end)
77                        .stream();
78                    position1 = Some(position);
79                    *guard = ReaderState::Reading {
80                        position,
81                        op: Box::pin(stream),
82                    };
83                }
84                ReaderState::Reading { position, mut op } => {
85                    let position1 = position1.get_or_insert(position);
86                    match op.poll_next(cx) {
87                        Poll::Ready(Some(ExportRangesItem::Size(_))) => {
88                            *guard = ReaderState::Reading { position, op };
89                        }
90                        Poll::Ready(Some(ExportRangesItem::Data(data))) => {
91                            if data.offset != *position1 {
92                                break Poll::Ready(Err(io::Error::other(
93                                    "Data offset does not match expected position",
94                                )));
95                            }
96                            buf.put_slice(&data.data);
97                            // update just local position1, not the position in the state.
98                            *position1 =
99                                position1
100                                    .checked_add(data.data.len() as u64)
101                                    .ok_or_else(|| {
102                                        io::Error::new(ErrorKind::InvalidInput, "Position overflow")
103                                    })?;
104                            *guard = ReaderState::Reading { position, op };
105                        }
106                        Poll::Ready(Some(ExportRangesItem::Error(err))) => {
107                            *guard = ReaderState::Idle { position };
108                            break Poll::Ready(Err(io::Error::other(format!(
109                                "Error reading data: {err}"
110                            ))));
111                        }
112                        Poll::Ready(None) => {
113                            // done with the stream, go back in idle.
114                            *guard = ReaderState::Idle {
115                                position: *position1,
116                            };
117                            break Poll::Ready(Ok(()));
118                        }
119                        Poll::Pending => {
120                            break if position != *position1 {
121                                // we read some data so we need to abort the op.
122                                //
123                                // we can't be sure we won't be called with the same buf size next time.
124                                *guard = ReaderState::Idle {
125                                    position: *position1,
126                                };
127                                Poll::Ready(Ok(()))
128                            } else {
129                                // nothing was read yet, we remain in the reading state
130                                //
131                                // we make an assumption here that the next call will be with the same buf size.
132                                *guard = ReaderState::Reading {
133                                    position: *position1,
134                                    op,
135                                };
136                                Poll::Pending
137                            };
138                        }
139                    }
140                }
141                state @ ReaderState::Seeking { .. } => {
142                    // should I try to recover from this or just keep it poisoned?
143                    this.state = state;
144                    break Poll::Ready(Err(io::Error::other("Can't read while seeking")));
145                }
146                ReaderState::Poisoned => {
147                    break Poll::Ready(Err(io::Error::other("Reader is poisoned")));
148                }
149            };
150        }
151    }
152}
153
154impl tokio::io::AsyncSeek for BlobReader {
155    fn start_seek(
156        self: std::pin::Pin<&mut Self>,
157        seek_from: tokio::io::SeekFrom,
158    ) -> io::Result<()> {
159        let this = self.get_mut();
160        let guard = &mut this.state;
161        match std::mem::take(guard) {
162            ReaderState::Idle { position } => {
163                let position1 = match seek_from {
164                    SeekFrom::Start(pos) => pos,
165                    SeekFrom::Current(offset) => {
166                        position.checked_add_signed(offset).ok_or_else(|| {
167                            io::Error::new(
168                                ErrorKind::InvalidInput,
169                                "Position overflow when seeking",
170                            )
171                        })?
172                    }
173                    SeekFrom::End(_offset) => {
174                        // todo: support seeking from end if we know the size
175                        return Err(io::Error::new(
176                            ErrorKind::InvalidInput,
177                            "Seeking from end is not supported yet",
178                        ))?;
179                    }
180                };
181                *guard = ReaderState::Seeking {
182                    position: position1,
183                };
184                Ok(())
185            }
186            ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")),
187            ReaderState::Seeking { .. } => Err(io::Error::other("Already seeking")),
188            ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
189        }
190    }
191
192    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
193        let this = self.get_mut();
194        let guard = &mut this.state;
195        Poll::Ready(match std::mem::take(guard) {
196            ReaderState::Seeking { position } => {
197                *guard = ReaderState::Idle { position };
198                Ok(position)
199            }
200            ReaderState::Idle { position } => {
201                // seek calls poll_complete just in case, to finish a pending seek operation
202                // before the next seek operation. So it is poll_complete/start_seek/poll_complete
203                *guard = ReaderState::Idle { position };
204                Ok(position)
205            }
206            state @ ReaderState::Reading { .. } => {
207                // should I try to recover from this or just keep it poisoned?
208                *guard = state;
209                Err(io::Error::other("Can't seek while reading"))
210            }
211            ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
212        })
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use bao_tree::ChunkRanges;
219    use testresult::TestResult;
220    use tokio::io::{AsyncReadExt, AsyncSeekExt};
221
222    use super::*;
223    use crate::{
224        store::{
225            fs::{
226                tests::{create_n0_bao, test_data, INTERESTING_SIZES},
227                FsStore,
228            },
229            mem::MemStore,
230        },
231        util::ChunkRangesExt,
232    };
233
234    async fn reader_smoke(blobs: &Blobs) -> TestResult<()> {
235        for size in INTERESTING_SIZES {
236            let data = test_data(size);
237            let tag = blobs.add_bytes(data.clone()).await?;
238            // read all
239            {
240                let mut reader = blobs.reader(tag.hash);
241                let mut buf = Vec::new();
242                reader.read_to_end(&mut buf).await?;
243                assert_eq!(buf, data);
244                let pos = reader.stream_position().await?;
245                assert_eq!(pos, data.len() as u64);
246            }
247            // seek to mid and read all
248            {
249                let mut reader = blobs.reader(tag.hash);
250                let mid = size / 2;
251                reader.seek(SeekFrom::Start(mid as u64)).await?;
252                let mut buf = Vec::new();
253                reader.read_to_end(&mut buf).await?;
254                assert_eq!(buf, data[mid..].to_vec());
255                let pos = reader.stream_position().await?;
256                assert_eq!(pos, data.len() as u64);
257            }
258        }
259        Ok(())
260    }
261
262    async fn reader_partial(blobs: &Blobs) -> TestResult<()> {
263        for size in INTERESTING_SIZES {
264            let data = test_data(size);
265            let ranges = ChunkRanges::chunk(0);
266            let (hash, bao) = create_n0_bao(&data, &ranges)?;
267            println!("importing {} bytes", bao.len());
268            blobs.import_bao_bytes(hash, ranges.clone(), bao).await?;
269            // read the first chunk or the entire blob, whatever is smaller
270            // this should work!
271            {
272                let mut reader = blobs.reader(hash);
273                let valid = size.min(1024);
274                let mut buf = vec![0u8; valid];
275                reader.read_exact(&mut buf).await?;
276                assert_eq!(buf, data[..valid]);
277                let pos = reader.stream_position().await?;
278                assert_eq!(pos, valid as u64);
279            }
280            if size > 1024 {
281                // read the part we don't have - should immediately return an error
282                {
283                    let mut reader = blobs.reader(hash);
284                    let mut rest = vec![0u8; size - 1024];
285                    reader.seek(SeekFrom::Start(1024)).await?;
286                    let res = reader.read_exact(&mut rest).await;
287                    assert!(res.is_err());
288                }
289                // read crossing the end of the blob - should return an error despite
290                // the first bytes being valid.
291                // A read that fails should not update the stream position.
292                {
293                    let mut reader = blobs.reader(hash);
294                    let mut buf = vec![0u8; size];
295                    let res = reader.read(&mut buf).await;
296                    assert!(res.is_err());
297                    let pos = reader.stream_position().await?;
298                    assert_eq!(pos, 0);
299                }
300            }
301        }
302        Ok(())
303    }
304
305    #[tokio::test]
306    async fn reader_partial_fs() -> TestResult<()> {
307        let testdir = tempfile::tempdir()?;
308        let store = FsStore::load(testdir.path().to_owned()).await?;
309        reader_partial(store.blobs()).await?;
310        Ok(())
311    }
312
313    #[tokio::test]
314    async fn reader_partial_memory() -> TestResult<()> {
315        let store = MemStore::new();
316        reader_partial(store.blobs()).await?;
317        Ok(())
318    }
319
320    #[tokio::test]
321    async fn reader_smoke_fs() -> TestResult<()> {
322        let testdir = tempfile::tempdir()?;
323        let store = FsStore::load(testdir.path().to_owned()).await?;
324        reader_smoke(store.blobs()).await?;
325        Ok(())
326    }
327
328    #[tokio::test]
329    async fn reader_smoke_memory() -> TestResult<()> {
330        let store = MemStore::new();
331        reader_smoke(store.blobs()).await?;
332        Ok(())
333    }
334}