iroh_blobs/api/blobs/
reader.rs

1use std::{
2    io::{self, ErrorKind, SeekFrom},
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use n0_future::StreamExt;
8
9use crate::{
10    api::{
11        blobs::{Blobs, ReaderOptions},
12        proto::ExportRangesItem,
13    },
14    Hash,
15};
16
17/// A reader for blobs that implements `AsyncRead` and `AsyncSeek`.
18#[derive(Debug)]
19pub struct BlobReader {
20    blobs: Blobs,
21    options: ReaderOptions,
22    state: ReaderState,
23}
24
25#[derive(Default, derive_more::Debug)]
26enum ReaderState {
27    Idle {
28        position: u64,
29    },
30    Seeking {
31        position: u64,
32    },
33    Reading {
34        position: u64,
35        #[debug(skip)]
36        op: n0_future::boxed::BoxStream<ExportRangesItem>,
37    },
38    #[default]
39    Poisoned,
40}
41
42impl BlobReader {
43    pub(super) fn new(blobs: Blobs, options: ReaderOptions) -> Self {
44        Self {
45            blobs,
46            options,
47            state: ReaderState::Idle { position: 0 },
48        }
49    }
50
51    pub fn hash(&self) -> &Hash {
52        &self.options.hash
53    }
54}
55
56impl tokio::io::AsyncRead for BlobReader {
57    fn poll_read(
58        self: Pin<&mut Self>,
59        cx: &mut Context<'_>,
60        buf: &mut tokio::io::ReadBuf<'_>,
61    ) -> Poll<io::Result<()>> {
62        let this = self.get_mut();
63        let mut position1 = None;
64        loop {
65            let guard = &mut this.state;
66            match std::mem::take(guard) {
67                ReaderState::Idle { position } => {
68                    // todo: read until next page boundary instead of fixed size
69                    let len = buf.remaining() as u64;
70                    let end = position.checked_add(len).ok_or_else(|| {
71                        io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading")
72                    })?;
73                    // start the export op for the entire size of the buffer, and convert to a stream
74                    let stream = this
75                        .blobs
76                        .export_ranges(this.options.hash, position..end)
77                        .stream();
78                    position1 = Some(position);
79                    *guard = ReaderState::Reading {
80                        position,
81                        op: Box::pin(stream),
82                    };
83                }
84                ReaderState::Reading { position, mut op } => {
85                    let position1 = position1.get_or_insert(position);
86                    match op.poll_next(cx) {
87                        Poll::Ready(Some(ExportRangesItem::Size(_))) => {
88                            *guard = ReaderState::Reading { position, op };
89                        }
90                        Poll::Ready(Some(ExportRangesItem::Data(data))) => {
91                            if data.offset != *position1 {
92                                break Poll::Ready(Err(io::Error::other(
93                                    "Data offset does not match expected position",
94                                )));
95                            }
96                            buf.put_slice(&data.data);
97                            // update just local position1, not the position in the state.
98                            *position1 =
99                                position1
100                                    .checked_add(data.data.len() as u64)
101                                    .ok_or_else(|| {
102                                        io::Error::new(ErrorKind::InvalidInput, "Position overflow")
103                                    })?;
104                            *guard = ReaderState::Reading { position, op };
105                        }
106                        Poll::Ready(Some(ExportRangesItem::Error(err))) => {
107                            *guard = ReaderState::Idle { position };
108                            break Poll::Ready(Err(io::Error::other(format!(
109                                "Error reading data: {err}"
110                            ))));
111                        }
112                        Poll::Ready(None) => {
113                            // done with the stream, go back in idle.
114                            *guard = ReaderState::Idle {
115                                position: *position1,
116                            };
117                            break Poll::Ready(Ok(()));
118                        }
119                        Poll::Pending => {
120                            break if position != *position1 {
121                                // we read some data so we need to abort the op.
122                                //
123                                // we can't be sure we won't be called with the same buf size next time.
124                                *guard = ReaderState::Idle {
125                                    position: *position1,
126                                };
127                                Poll::Ready(Ok(()))
128                            } else {
129                                // nothing was read yet, we remain in the reading state
130                                //
131                                // we make an assumption here that the next call will be with the same buf size.
132                                *guard = ReaderState::Reading {
133                                    position: *position1,
134                                    op,
135                                };
136                                Poll::Pending
137                            };
138                        }
139                    }
140                }
141                state @ ReaderState::Seeking { .. } => {
142                    // should I try to recover from this or just keep it poisoned?
143                    this.state = state;
144                    break Poll::Ready(Err(io::Error::other("Can't read while seeking")));
145                }
146                ReaderState::Poisoned => {
147                    break Poll::Ready(Err(io::Error::other("Reader is poisoned")));
148                }
149            };
150        }
151    }
152}
153
154impl tokio::io::AsyncSeek for BlobReader {
155    fn start_seek(
156        self: std::pin::Pin<&mut Self>,
157        seek_from: tokio::io::SeekFrom,
158    ) -> io::Result<()> {
159        let this = self.get_mut();
160        let guard = &mut this.state;
161        match std::mem::take(guard) {
162            ReaderState::Idle { position } => {
163                let position1 = match seek_from {
164                    SeekFrom::Start(pos) => pos,
165                    SeekFrom::Current(offset) => {
166                        position.checked_add_signed(offset).ok_or_else(|| {
167                            io::Error::new(
168                                ErrorKind::InvalidInput,
169                                "Position overflow when seeking",
170                            )
171                        })?
172                    }
173                    SeekFrom::End(_offset) => {
174                        // todo: support seeking from end if we know the size
175                        return Err(io::Error::new(
176                            ErrorKind::InvalidInput,
177                            "Seeking from end is not supported yet",
178                        ))?;
179                    }
180                };
181                *guard = ReaderState::Seeking {
182                    position: position1,
183                };
184                Ok(())
185            }
186            ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")),
187            ReaderState::Seeking { .. } => Err(io::Error::other("Already seeking")),
188            ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
189        }
190    }
191
192    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
193        let this = self.get_mut();
194        let guard = &mut this.state;
195        Poll::Ready(match std::mem::take(guard) {
196            ReaderState::Seeking { position } => {
197                *guard = ReaderState::Idle { position };
198                Ok(position)
199            }
200            ReaderState::Idle { position } => {
201                // seek calls poll_complete just in case, to finish a pending seek operation
202                // before the next seek operation. So it is poll_complete/start_seek/poll_complete
203                *guard = ReaderState::Idle { position };
204                Ok(position)
205            }
206            state @ ReaderState::Reading { .. } => {
207                // should I try to recover from this or just keep it poisoned?
208                *guard = state;
209                Err(io::Error::other("Can't seek while reading"))
210            }
211            ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
212        })
213    }
214}
215
216#[cfg(test)]
217#[cfg(feature = "fs-store")]
218mod tests {
219    use bao_tree::ChunkRanges;
220    use testresult::TestResult;
221    use tokio::io::{AsyncReadExt, AsyncSeekExt};
222
223    use super::*;
224    use crate::{
225        protocol::ChunkRangesExt,
226        store::{
227            fs::{
228                tests::{create_n0_bao, test_data, INTERESTING_SIZES},
229                FsStore,
230            },
231            mem::MemStore,
232        },
233    };
234
235    async fn reader_smoke(blobs: &Blobs) -> TestResult<()> {
236        for size in INTERESTING_SIZES {
237            let data = test_data(size);
238            let tag = blobs.add_bytes(data.clone()).await?;
239            // read all
240            {
241                let mut reader = blobs.reader(tag.hash);
242                let mut buf = Vec::new();
243                reader.read_to_end(&mut buf).await?;
244                assert_eq!(buf, data);
245                let pos = reader.stream_position().await?;
246                assert_eq!(pos, data.len() as u64);
247            }
248            // seek to mid and read all
249            {
250                let mut reader = blobs.reader(tag.hash);
251                let mid = size / 2;
252                reader.seek(SeekFrom::Start(mid as u64)).await?;
253                let mut buf = Vec::new();
254                reader.read_to_end(&mut buf).await?;
255                assert_eq!(buf, data[mid..].to_vec());
256                let pos = reader.stream_position().await?;
257                assert_eq!(pos, data.len() as u64);
258            }
259        }
260        Ok(())
261    }
262
263    async fn reader_partial(blobs: &Blobs) -> TestResult<()> {
264        for size in INTERESTING_SIZES {
265            let data = test_data(size);
266            let ranges = ChunkRanges::chunk(0);
267            let (hash, bao) = create_n0_bao(&data, &ranges)?;
268            println!("importing {} bytes", bao.len());
269            blobs.import_bao_bytes(hash, ranges.clone(), bao).await?;
270            // read the first chunk or the entire blob, whatever is smaller
271            // this should work!
272            {
273                let mut reader = blobs.reader(hash);
274                let valid = size.min(1024);
275                let mut buf = vec![0u8; valid];
276                reader.read_exact(&mut buf).await?;
277                assert_eq!(buf, data[..valid]);
278                let pos = reader.stream_position().await?;
279                assert_eq!(pos, valid as u64);
280            }
281            if size > 1024 {
282                // read the part we don't have - should immediately return an error
283                {
284                    let mut reader = blobs.reader(hash);
285                    let mut rest = vec![0u8; size - 1024];
286                    reader.seek(SeekFrom::Start(1024)).await?;
287                    let res = reader.read_exact(&mut rest).await;
288                    assert!(res.is_err());
289                }
290                // read crossing the end of the blob - should return an error despite
291                // the first bytes being valid.
292                // A read that fails should not update the stream position.
293                {
294                    let mut reader = blobs.reader(hash);
295                    let mut buf = vec![0u8; size];
296                    let res = reader.read(&mut buf).await;
297                    assert!(res.is_err());
298                    let pos = reader.stream_position().await?;
299                    assert_eq!(pos, 0);
300                }
301            }
302        }
303        Ok(())
304    }
305
306    #[tokio::test]
307    async fn reader_partial_fs() -> TestResult<()> {
308        let testdir = tempfile::tempdir()?;
309        let store = FsStore::load(testdir.path().to_owned()).await?;
310        reader_partial(store.blobs()).await?;
311        Ok(())
312    }
313
314    #[tokio::test]
315    async fn reader_partial_memory() -> TestResult<()> {
316        let store = MemStore::new();
317        reader_partial(store.blobs()).await?;
318        Ok(())
319    }
320
321    #[tokio::test]
322    async fn reader_smoke_fs() -> TestResult<()> {
323        let testdir = tempfile::tempdir()?;
324        let store = FsStore::load(testdir.path().to_owned()).await?;
325        reader_smoke(store.blobs()).await?;
326        Ok(())
327    }
328
329    #[tokio::test]
330    async fn reader_smoke_memory() -> TestResult<()> {
331        let store = MemStore::new();
332        reader_smoke(store.blobs()).await?;
333        Ok(())
334    }
335}