iroh_blobs/api/blobs/
reader.rs1use std::{
2 io::{self, ErrorKind, SeekFrom},
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use n0_future::StreamExt;
8
9use crate::{
10 api::{
11 blobs::{Blobs, ReaderOptions},
12 proto::ExportRangesItem,
13 },
14 Hash,
15};
16
17#[derive(Debug)]
19pub struct BlobReader {
20 blobs: Blobs,
21 options: ReaderOptions,
22 state: ReaderState,
23}
24
25#[derive(Default, derive_more::Debug)]
26enum ReaderState {
27 Idle {
28 position: u64,
29 },
30 Seeking {
31 position: u64,
32 },
33 Reading {
34 position: u64,
35 #[debug(skip)]
36 op: n0_future::boxed::BoxStream<ExportRangesItem>,
37 },
38 #[default]
39 Poisoned,
40}
41
42impl BlobReader {
43 pub(super) fn new(blobs: Blobs, options: ReaderOptions) -> Self {
44 Self {
45 blobs,
46 options,
47 state: ReaderState::Idle { position: 0 },
48 }
49 }
50
51 pub fn hash(&self) -> &Hash {
52 &self.options.hash
53 }
54}
55
56impl tokio::io::AsyncRead for BlobReader {
57 fn poll_read(
58 self: Pin<&mut Self>,
59 cx: &mut Context<'_>,
60 buf: &mut tokio::io::ReadBuf<'_>,
61 ) -> Poll<io::Result<()>> {
62 let this = self.get_mut();
63 let mut position1 = None;
64 loop {
65 let guard = &mut this.state;
66 match std::mem::take(guard) {
67 ReaderState::Idle { position } => {
68 let len = buf.remaining() as u64;
70 let end = position.checked_add(len).ok_or_else(|| {
71 io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading")
72 })?;
73 let stream = this
75 .blobs
76 .export_ranges(this.options.hash, position..end)
77 .stream();
78 position1 = Some(position);
79 *guard = ReaderState::Reading {
80 position,
81 op: Box::pin(stream),
82 };
83 }
84 ReaderState::Reading { position, mut op } => {
85 let position1 = position1.get_or_insert(position);
86 match op.poll_next(cx) {
87 Poll::Ready(Some(ExportRangesItem::Size(_))) => {
88 *guard = ReaderState::Reading { position, op };
89 }
90 Poll::Ready(Some(ExportRangesItem::Data(data))) => {
91 if data.offset != *position1 {
92 break Poll::Ready(Err(io::Error::other(
93 "Data offset does not match expected position",
94 )));
95 }
96 buf.put_slice(&data.data);
97 *position1 =
99 position1
100 .checked_add(data.data.len() as u64)
101 .ok_or_else(|| {
102 io::Error::new(ErrorKind::InvalidInput, "Position overflow")
103 })?;
104 *guard = ReaderState::Reading { position, op };
105 }
106 Poll::Ready(Some(ExportRangesItem::Error(err))) => {
107 *guard = ReaderState::Idle { position };
108 break Poll::Ready(Err(io::Error::other(format!(
109 "Error reading data: {err}"
110 ))));
111 }
112 Poll::Ready(None) => {
113 *guard = ReaderState::Idle {
115 position: *position1,
116 };
117 break Poll::Ready(Ok(()));
118 }
119 Poll::Pending => {
120 break if position != *position1 {
121 *guard = ReaderState::Idle {
125 position: *position1,
126 };
127 Poll::Ready(Ok(()))
128 } else {
129 *guard = ReaderState::Reading {
133 position: *position1,
134 op,
135 };
136 Poll::Pending
137 };
138 }
139 }
140 }
141 state @ ReaderState::Seeking { .. } => {
142 this.state = state;
144 break Poll::Ready(Err(io::Error::other("Can't read while seeking")));
145 }
146 ReaderState::Poisoned => {
147 break Poll::Ready(Err(io::Error::other("Reader is poisoned")));
148 }
149 };
150 }
151 }
152}
153
154impl tokio::io::AsyncSeek for BlobReader {
155 fn start_seek(
156 self: std::pin::Pin<&mut Self>,
157 seek_from: tokio::io::SeekFrom,
158 ) -> io::Result<()> {
159 let this = self.get_mut();
160 let guard = &mut this.state;
161 match std::mem::take(guard) {
162 ReaderState::Idle { position } => {
163 let position1 = match seek_from {
164 SeekFrom::Start(pos) => pos,
165 SeekFrom::Current(offset) => {
166 position.checked_add_signed(offset).ok_or_else(|| {
167 io::Error::new(
168 ErrorKind::InvalidInput,
169 "Position overflow when seeking",
170 )
171 })?
172 }
173 SeekFrom::End(_offset) => {
174 return Err(io::Error::new(
176 ErrorKind::InvalidInput,
177 "Seeking from end is not supported yet",
178 ))?;
179 }
180 };
181 *guard = ReaderState::Seeking {
182 position: position1,
183 };
184 Ok(())
185 }
186 ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")),
187 ReaderState::Seeking { .. } => Err(io::Error::other("Already seeking")),
188 ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
189 }
190 }
191
192 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
193 let this = self.get_mut();
194 let guard = &mut this.state;
195 Poll::Ready(match std::mem::take(guard) {
196 ReaderState::Seeking { position } => {
197 *guard = ReaderState::Idle { position };
198 Ok(position)
199 }
200 ReaderState::Idle { position } => {
201 *guard = ReaderState::Idle { position };
204 Ok(position)
205 }
206 state @ ReaderState::Reading { .. } => {
207 *guard = state;
209 Err(io::Error::other("Can't seek while reading"))
210 }
211 ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
212 })
213 }
214}
215
216#[cfg(test)]
217#[cfg(feature = "fs-store")]
218mod tests {
219 use bao_tree::ChunkRanges;
220 use testresult::TestResult;
221 use tokio::io::{AsyncReadExt, AsyncSeekExt};
222
223 use super::*;
224 use crate::{
225 protocol::ChunkRangesExt,
226 store::{
227 fs::{
228 tests::{create_n0_bao, test_data, INTERESTING_SIZES},
229 FsStore,
230 },
231 mem::MemStore,
232 },
233 };
234
235 async fn reader_smoke(blobs: &Blobs) -> TestResult<()> {
236 for size in INTERESTING_SIZES {
237 let data = test_data(size);
238 let tag = blobs.add_bytes(data.clone()).await?;
239 {
241 let mut reader = blobs.reader(tag.hash);
242 let mut buf = Vec::new();
243 reader.read_to_end(&mut buf).await?;
244 assert_eq!(buf, data);
245 let pos = reader.stream_position().await?;
246 assert_eq!(pos, data.len() as u64);
247 }
248 {
250 let mut reader = blobs.reader(tag.hash);
251 let mid = size / 2;
252 reader.seek(SeekFrom::Start(mid as u64)).await?;
253 let mut buf = Vec::new();
254 reader.read_to_end(&mut buf).await?;
255 assert_eq!(buf, data[mid..].to_vec());
256 let pos = reader.stream_position().await?;
257 assert_eq!(pos, data.len() as u64);
258 }
259 }
260 Ok(())
261 }
262
263 async fn reader_partial(blobs: &Blobs) -> TestResult<()> {
264 for size in INTERESTING_SIZES {
265 let data = test_data(size);
266 let ranges = ChunkRanges::chunk(0);
267 let (hash, bao) = create_n0_bao(&data, &ranges)?;
268 println!("importing {} bytes", bao.len());
269 blobs.import_bao_bytes(hash, ranges.clone(), bao).await?;
270 {
273 let mut reader = blobs.reader(hash);
274 let valid = size.min(1024);
275 let mut buf = vec![0u8; valid];
276 reader.read_exact(&mut buf).await?;
277 assert_eq!(buf, data[..valid]);
278 let pos = reader.stream_position().await?;
279 assert_eq!(pos, valid as u64);
280 }
281 if size > 1024 {
282 {
284 let mut reader = blobs.reader(hash);
285 let mut rest = vec![0u8; size - 1024];
286 reader.seek(SeekFrom::Start(1024)).await?;
287 let res = reader.read_exact(&mut rest).await;
288 assert!(res.is_err());
289 }
290 {
294 let mut reader = blobs.reader(hash);
295 let mut buf = vec![0u8; size];
296 let res = reader.read(&mut buf).await;
297 assert!(res.is_err());
298 let pos = reader.stream_position().await?;
299 assert_eq!(pos, 0);
300 }
301 }
302 }
303 Ok(())
304 }
305
306 #[tokio::test]
307 async fn reader_partial_fs() -> TestResult<()> {
308 let testdir = tempfile::tempdir()?;
309 let store = FsStore::load(testdir.path().to_owned()).await?;
310 reader_partial(store.blobs()).await?;
311 Ok(())
312 }
313
314 #[tokio::test]
315 async fn reader_partial_memory() -> TestResult<()> {
316 let store = MemStore::new();
317 reader_partial(store.blobs()).await?;
318 Ok(())
319 }
320
321 #[tokio::test]
322 async fn reader_smoke_fs() -> TestResult<()> {
323 let testdir = tempfile::tempdir()?;
324 let store = FsStore::load(testdir.path().to_owned()).await?;
325 reader_smoke(store.blobs()).await?;
326 Ok(())
327 }
328
329 #[tokio::test]
330 async fn reader_smoke_memory() -> TestResult<()> {
331 let store = MemStore::new();
332 reader_smoke(store.blobs()).await?;
333 Ok(())
334 }
335}