iroh_blobs/api/blobs/
reader.rs1use std::{
2 io::{self, ErrorKind, SeekFrom},
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use n0_future::StreamExt;
8
9use crate::{
10 api::{
11 blobs::{Blobs, ReaderOptions},
12 proto::ExportRangesItem,
13 },
14 Hash,
15};
16
17#[derive(Debug)]
19pub struct BlobReader {
20 blobs: Blobs,
21 options: ReaderOptions,
22 state: ReaderState,
23}
24
25#[derive(Default, derive_more::Debug)]
26enum ReaderState {
27 Idle {
28 position: u64,
29 },
30 Seeking {
31 position: u64,
32 },
33 Reading {
34 position: u64,
35 #[debug(skip)]
36 op: n0_future::boxed::BoxStream<ExportRangesItem>,
37 },
38 #[default]
39 Poisoned,
40}
41
42impl BlobReader {
43 pub(super) fn new(blobs: Blobs, options: ReaderOptions) -> Self {
44 Self {
45 blobs,
46 options,
47 state: ReaderState::Idle { position: 0 },
48 }
49 }
50
51 pub fn hash(&self) -> &Hash {
52 &self.options.hash
53 }
54}
55
56impl tokio::io::AsyncRead for BlobReader {
57 fn poll_read(
58 self: Pin<&mut Self>,
59 cx: &mut Context<'_>,
60 buf: &mut tokio::io::ReadBuf<'_>,
61 ) -> Poll<io::Result<()>> {
62 let this = self.get_mut();
63 let mut position1 = None;
64 loop {
65 let guard = &mut this.state;
66 match std::mem::take(guard) {
67 ReaderState::Idle { position } => {
68 let len = buf.remaining() as u64;
70 let end = position.checked_add(len).ok_or_else(|| {
71 io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading")
72 })?;
73 let stream = this
75 .blobs
76 .export_ranges(this.options.hash, position..end)
77 .stream();
78 position1 = Some(position);
79 *guard = ReaderState::Reading {
80 position,
81 op: Box::pin(stream),
82 };
83 }
84 ReaderState::Reading { position, mut op } => {
85 let position1 = position1.get_or_insert(position);
86 match op.poll_next(cx) {
87 Poll::Ready(Some(ExportRangesItem::Size(_))) => {
88 *guard = ReaderState::Reading { position, op };
89 }
90 Poll::Ready(Some(ExportRangesItem::Data(data))) => {
91 if data.offset != *position1 {
92 break Poll::Ready(Err(io::Error::other(
93 "Data offset does not match expected position",
94 )));
95 }
96 buf.put_slice(&data.data);
97 *position1 =
99 position1
100 .checked_add(data.data.len() as u64)
101 .ok_or_else(|| {
102 io::Error::new(ErrorKind::InvalidInput, "Position overflow")
103 })?;
104 *guard = ReaderState::Reading { position, op };
105 }
106 Poll::Ready(Some(ExportRangesItem::Error(err))) => {
107 *guard = ReaderState::Idle { position };
108 break Poll::Ready(Err(io::Error::other(format!(
109 "Error reading data: {err}"
110 ))));
111 }
112 Poll::Ready(None) => {
113 *guard = ReaderState::Idle {
115 position: *position1,
116 };
117 break Poll::Ready(Ok(()));
118 }
119 Poll::Pending => {
120 break if position != *position1 {
121 *guard = ReaderState::Idle {
125 position: *position1,
126 };
127 Poll::Ready(Ok(()))
128 } else {
129 *guard = ReaderState::Reading {
133 position: *position1,
134 op,
135 };
136 Poll::Pending
137 };
138 }
139 }
140 }
141 state @ ReaderState::Seeking { .. } => {
142 this.state = state;
144 break Poll::Ready(Err(io::Error::other("Can't read while seeking")));
145 }
146 ReaderState::Poisoned => {
147 break Poll::Ready(Err(io::Error::other("Reader is poisoned")));
148 }
149 };
150 }
151 }
152}
153
154impl tokio::io::AsyncSeek for BlobReader {
155 fn start_seek(
156 self: std::pin::Pin<&mut Self>,
157 seek_from: tokio::io::SeekFrom,
158 ) -> io::Result<()> {
159 let this = self.get_mut();
160 let guard = &mut this.state;
161 match std::mem::take(guard) {
162 ReaderState::Idle { position } => {
163 let position1 = match seek_from {
164 SeekFrom::Start(pos) => pos,
165 SeekFrom::Current(offset) => {
166 position.checked_add_signed(offset).ok_or_else(|| {
167 io::Error::new(
168 ErrorKind::InvalidInput,
169 "Position overflow when seeking",
170 )
171 })?
172 }
173 SeekFrom::End(_offset) => {
174 return Err(io::Error::new(
176 ErrorKind::InvalidInput,
177 "Seeking from end is not supported yet",
178 ))?;
179 }
180 };
181 *guard = ReaderState::Seeking {
182 position: position1,
183 };
184 Ok(())
185 }
186 ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")),
187 ReaderState::Seeking { .. } => Err(io::Error::other("Already seeking")),
188 ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
189 }
190 }
191
192 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
193 let this = self.get_mut();
194 let guard = &mut this.state;
195 Poll::Ready(match std::mem::take(guard) {
196 ReaderState::Seeking { position } => {
197 *guard = ReaderState::Idle { position };
198 Ok(position)
199 }
200 ReaderState::Idle { position } => {
201 *guard = ReaderState::Idle { position };
204 Ok(position)
205 }
206 state @ ReaderState::Reading { .. } => {
207 *guard = state;
209 Err(io::Error::other("Can't seek while reading"))
210 }
211 ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")),
212 })
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use bao_tree::ChunkRanges;
219 use testresult::TestResult;
220 use tokio::io::{AsyncReadExt, AsyncSeekExt};
221
222 use super::*;
223 use crate::{
224 store::{
225 fs::{
226 tests::{create_n0_bao, test_data, INTERESTING_SIZES},
227 FsStore,
228 },
229 mem::MemStore,
230 },
231 util::ChunkRangesExt,
232 };
233
234 async fn reader_smoke(blobs: &Blobs) -> TestResult<()> {
235 for size in INTERESTING_SIZES {
236 let data = test_data(size);
237 let tag = blobs.add_bytes(data.clone()).await?;
238 {
240 let mut reader = blobs.reader(tag.hash);
241 let mut buf = Vec::new();
242 reader.read_to_end(&mut buf).await?;
243 assert_eq!(buf, data);
244 let pos = reader.stream_position().await?;
245 assert_eq!(pos, data.len() as u64);
246 }
247 {
249 let mut reader = blobs.reader(tag.hash);
250 let mid = size / 2;
251 reader.seek(SeekFrom::Start(mid as u64)).await?;
252 let mut buf = Vec::new();
253 reader.read_to_end(&mut buf).await?;
254 assert_eq!(buf, data[mid..].to_vec());
255 let pos = reader.stream_position().await?;
256 assert_eq!(pos, data.len() as u64);
257 }
258 }
259 Ok(())
260 }
261
262 async fn reader_partial(blobs: &Blobs) -> TestResult<()> {
263 for size in INTERESTING_SIZES {
264 let data = test_data(size);
265 let ranges = ChunkRanges::chunk(0);
266 let (hash, bao) = create_n0_bao(&data, &ranges)?;
267 println!("importing {} bytes", bao.len());
268 blobs.import_bao_bytes(hash, ranges.clone(), bao).await?;
269 {
272 let mut reader = blobs.reader(hash);
273 let valid = size.min(1024);
274 let mut buf = vec![0u8; valid];
275 reader.read_exact(&mut buf).await?;
276 assert_eq!(buf, data[..valid]);
277 let pos = reader.stream_position().await?;
278 assert_eq!(pos, valid as u64);
279 }
280 if size > 1024 {
281 {
283 let mut reader = blobs.reader(hash);
284 let mut rest = vec![0u8; size - 1024];
285 reader.seek(SeekFrom::Start(1024)).await?;
286 let res = reader.read_exact(&mut rest).await;
287 assert!(res.is_err());
288 }
289 {
293 let mut reader = blobs.reader(hash);
294 let mut buf = vec![0u8; size];
295 let res = reader.read(&mut buf).await;
296 assert!(res.is_err());
297 let pos = reader.stream_position().await?;
298 assert_eq!(pos, 0);
299 }
300 }
301 }
302 Ok(())
303 }
304
305 #[tokio::test]
306 async fn reader_partial_fs() -> TestResult<()> {
307 let testdir = tempfile::tempdir()?;
308 let store = FsStore::load(testdir.path().to_owned()).await?;
309 reader_partial(store.blobs()).await?;
310 Ok(())
311 }
312
313 #[tokio::test]
314 async fn reader_partial_memory() -> TestResult<()> {
315 let store = MemStore::new();
316 reader_partial(store.blobs()).await?;
317 Ok(())
318 }
319
320 #[tokio::test]
321 async fn reader_smoke_fs() -> TestResult<()> {
322 let testdir = tempfile::tempdir()?;
323 let store = FsStore::load(testdir.path().to_owned()).await?;
324 reader_smoke(store.blobs()).await?;
325 Ok(())
326 }
327
328 #[tokio::test]
329 async fn reader_smoke_memory() -> TestResult<()> {
330 let store = MemStore::new();
331 reader_smoke(store.blobs()).await?;
332 Ok(())
333 }
334}