1use crate::block::Block;
7use crate::chunking::{DagLink, DagNode};
8use crate::cid::Cid;
9use crate::error::{Error, Result};
10use bytes::Bytes;
11use futures::future::BoxFuture;
12use std::collections::VecDeque;
13use std::io::{self, Read};
14use std::pin::Pin;
15use std::task::{Context, Poll};
16use tokio::io::{AsyncRead, ReadBuf};
17
18pub trait BlockFetcher: Send + Sync {
20 fn fetch(&self, cid: Cid) -> BoxFuture<'_, Result<Block>>;
22}
23
24pub struct MemoryBlockFetcher {
26 blocks: std::collections::HashMap<Cid, Block>,
27}
28
29impl MemoryBlockFetcher {
30 pub fn new() -> Self {
32 Self {
33 blocks: std::collections::HashMap::new(),
34 }
35 }
36
37 pub fn add_block(&mut self, block: Block) {
39 self.blocks.insert(*block.cid(), block);
40 }
41
42 pub fn add_blocks(&mut self, blocks: impl IntoIterator<Item = Block>) {
44 for block in blocks {
45 self.add_block(block);
46 }
47 }
48}
49
50impl Default for MemoryBlockFetcher {
51 fn default() -> Self {
52 Self::new()
53 }
54}
55
56impl BlockFetcher for MemoryBlockFetcher {
57 fn fetch(&self, cid: Cid) -> BoxFuture<'_, Result<Block>> {
58 let result = self
59 .blocks
60 .get(&cid)
61 .cloned()
62 .ok_or_else(|| Error::BlockNotFound(cid.to_string()));
63 Box::pin(async move { result })
64 }
65}
66
67pub struct BlockReader {
69 data: Bytes,
70 position: usize,
71}
72
73impl BlockReader {
74 pub fn new(block: &Block) -> Self {
76 Self {
77 data: block.data().clone(),
78 position: 0,
79 }
80 }
81
82 pub fn from_bytes(data: Bytes) -> Self {
84 Self { data, position: 0 }
85 }
86
87 pub fn remaining(&self) -> usize {
89 self.data.len() - self.position
90 }
91
92 pub fn is_empty(&self) -> bool {
94 self.position >= self.data.len()
95 }
96
97 pub fn len(&self) -> usize {
99 self.data.len()
100 }
101}
102
103impl Read for BlockReader {
104 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
105 if self.position >= self.data.len() {
106 return Ok(0);
107 }
108
109 let remaining = &self.data[self.position..];
110 let to_read = std::cmp::min(buf.len(), remaining.len());
111 buf[..to_read].copy_from_slice(&remaining[..to_read]);
112 self.position += to_read;
113 Ok(to_read)
114 }
115}
116
117pub struct AsyncBlockReader {
119 data: Bytes,
120 position: usize,
121}
122
123impl AsyncBlockReader {
124 pub fn new(block: &Block) -> Self {
126 Self {
127 data: block.data().clone(),
128 position: 0,
129 }
130 }
131
132 pub fn from_bytes(data: Bytes) -> Self {
134 Self { data, position: 0 }
135 }
136
137 pub fn remaining(&self) -> usize {
139 self.data.len() - self.position
140 }
141
142 pub fn is_empty(&self) -> bool {
144 self.position >= self.data.len()
145 }
146
147 pub fn len(&self) -> usize {
149 self.data.len()
150 }
151}
152
153impl AsyncRead for AsyncBlockReader {
154 fn poll_read(
155 mut self: Pin<&mut Self>,
156 _cx: &mut Context<'_>,
157 buf: &mut ReadBuf<'_>,
158 ) -> Poll<io::Result<()>> {
159 if self.position >= self.data.len() {
160 return Poll::Ready(Ok(()));
161 }
162
163 let remaining = &self.data[self.position..];
164 let to_read = std::cmp::min(buf.remaining(), remaining.len());
165 buf.put_slice(&remaining[..to_read]);
166 self.position += to_read;
167 Poll::Ready(Ok(()))
168 }
169}
170
171#[allow(dead_code)]
173enum DagReaderState {
174 Reading { data: Bytes, position: usize },
176 FetchingNext,
178 Done,
180}
181
182#[allow(dead_code)]
186pub struct DagStreamReader<F: BlockFetcher> {
187 fetcher: std::sync::Arc<F>,
188 state: DagReaderState,
189 pending_links: VecDeque<DagLink>,
190 total_read: u64,
191}
192
193impl<F: BlockFetcher> DagStreamReader<F> {
194 pub fn new(fetcher: std::sync::Arc<F>, root_links: Vec<DagLink>) -> Self {
196 Self {
197 fetcher,
198 state: DagReaderState::FetchingNext,
199 pending_links: root_links.into(),
200 total_read: 0,
201 }
202 }
203
204 pub fn from_node(fetcher: std::sync::Arc<F>, node: &DagNode) -> Self {
206 if let Some(data) = &node.data {
207 Self {
209 fetcher,
210 state: DagReaderState::Reading {
211 data: Bytes::from(data.clone()),
212 position: 0,
213 },
214 pending_links: VecDeque::new(),
215 total_read: 0,
216 }
217 } else {
218 Self::new(fetcher, node.links.clone())
220 }
221 }
222
223 pub fn bytes_read(&self) -> u64 {
225 self.total_read
226 }
227}
228
229pub struct DagChunkStream<F: BlockFetcher> {
231 fetcher: std::sync::Arc<F>,
232 pending_links: VecDeque<DagLink>,
233}
234
235impl<F: BlockFetcher> DagChunkStream<F> {
236 pub fn new(fetcher: std::sync::Arc<F>, links: Vec<DagLink>) -> Self {
238 Self {
239 fetcher,
240 pending_links: links.into(),
241 }
242 }
243
244 pub async fn next_chunk(&mut self) -> Option<Result<Bytes>> {
246 loop {
247 let link = self.pending_links.pop_front()?;
248
249 match self.fetcher.fetch(link.cid.0).await {
250 Ok(block) => {
251 if block.cid().codec() == 0x55 {
254 return Some(Ok(block.data().clone()));
255 }
256
257 match crate::ipld::Ipld::from_dag_cbor(block.data()) {
259 Ok(ipld) => {
260 if let crate::ipld::Ipld::Map(map) = ipld {
261 if let Some(crate::ipld::Ipld::List(links)) = map.get("links") {
263 let mut new_links = Vec::new();
265 for link_ipld in links {
266 if let crate::ipld::Ipld::Map(link_map) = link_ipld {
267 if let (
268 Some(crate::ipld::Ipld::Link(cid)),
269 Some(crate::ipld::Ipld::Integer(size)),
270 ) = (link_map.get("cid"), link_map.get("size"))
271 {
272 new_links.push(DagLink::new(cid.0, *size as u64));
273 }
274 }
275 }
276 for new_link in new_links.into_iter().rev() {
278 self.pending_links.push_front(new_link);
279 }
280 }
281
282 if let Some(crate::ipld::Ipld::Bytes(data)) = map.get("data") {
284 return Some(Ok(Bytes::from(data.clone())));
285 }
286 }
287 }
289 Err(e) => return Some(Err(e)),
290 }
291 }
292 Err(e) => return Some(Err(e)),
293 }
294 }
295 }
296
297 pub fn has_more(&self) -> bool {
299 !self.pending_links.is_empty()
300 }
301}
302
303pub async fn read_chunked_file<F: BlockFetcher>(fetcher: &F, root_cid: &Cid) -> Result<Vec<u8>> {
305 let root_block = fetcher.fetch(*root_cid).await?;
306
307 if root_block.cid().codec() == 0x55 {
309 return Ok(root_block.data().to_vec());
311 }
312
313 let root_ipld = crate::ipld::Ipld::from_dag_cbor(root_block.data())?;
315
316 let mut result = Vec::new();
317 let mut queue: VecDeque<crate::ipld::Ipld> = VecDeque::new();
318 queue.push_back(root_ipld);
319
320 while let Some(ipld) = queue.pop_front() {
321 if let crate::ipld::Ipld::Map(map) = ipld {
322 if let Some(crate::ipld::Ipld::Bytes(data)) = map.get("data") {
324 result.extend_from_slice(data);
325 }
326
327 if let Some(crate::ipld::Ipld::List(links)) = map.get("links") {
329 for link_ipld in links {
330 if let crate::ipld::Ipld::Map(link_map) = link_ipld {
331 if let Some(crate::ipld::Ipld::Link(cid)) = link_map.get("cid") {
332 let block = fetcher.fetch(cid.0).await?;
333
334 if block.cid().codec() == 0x55 {
335 result.extend_from_slice(block.data());
337 } else {
338 let child_ipld = crate::ipld::Ipld::from_dag_cbor(block.data())?;
340 queue.push_back(child_ipld);
341 }
342 }
343 }
344 }
345 }
346 }
347 }
348
349 Ok(result)
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use crate::chunking::{Chunker, ChunkingConfig};
356 use std::io::Read;
357
358 #[test]
359 fn test_block_reader() {
360 let block = Block::new(Bytes::from_static(b"Hello, World!")).unwrap();
361 let mut reader = BlockReader::new(&block);
362
363 let mut buf = [0u8; 5];
364 let n = reader.read(&mut buf).unwrap();
365 assert_eq!(n, 5);
366 assert_eq!(&buf, b"Hello");
367
368 let n = reader.read(&mut buf).unwrap();
369 assert_eq!(n, 5);
370 assert_eq!(&buf, b", Wor");
371
372 let n = reader.read(&mut buf).unwrap();
373 assert_eq!(n, 3);
374 assert_eq!(&buf[..3], b"ld!");
375 }
376
377 #[tokio::test]
378 async fn test_async_block_reader() {
379 use tokio::io::AsyncReadExt;
380
381 let block = Block::new(Bytes::from_static(b"Hello, World!")).unwrap();
382 let mut reader = AsyncBlockReader::new(&block);
383
384 let mut buf = Vec::new();
385 reader.read_to_end(&mut buf).await.unwrap();
386 assert_eq!(buf, b"Hello, World!");
387 }
388
389 #[tokio::test]
390 async fn test_memory_block_fetcher() {
391 let block = Block::new(Bytes::from_static(b"test data")).unwrap();
392 let cid = *block.cid();
393
394 let mut fetcher = MemoryBlockFetcher::new();
395 fetcher.add_block(block.clone());
396
397 let fetched = fetcher.fetch(cid).await.unwrap();
398 assert_eq!(fetched.data(), block.data());
399 }
400
401 #[tokio::test]
402 async fn test_read_single_block_file() {
403 let data = b"Hello, IPFS!";
404 let block = Block::new(Bytes::from_static(data)).unwrap();
405 let cid = *block.cid();
406
407 let mut fetcher = MemoryBlockFetcher::new();
408 fetcher.add_block(block);
409
410 let result = read_chunked_file(&fetcher, &cid).await.unwrap();
411 assert_eq!(result, data);
412 }
413
414 #[tokio::test]
415 async fn test_read_chunked_file() {
416 let config = ChunkingConfig::with_chunk_size(1024).unwrap();
418 let chunker = Chunker::with_config(config);
419
420 let data: Vec<u8> = (0..3000).map(|i| (i % 256) as u8).collect();
421 let chunked = chunker.chunk(&data).unwrap();
422
423 let mut fetcher = MemoryBlockFetcher::new();
425 fetcher.add_blocks(chunked.blocks.clone());
426
427 let result = read_chunked_file(&fetcher, &chunked.root_cid)
429 .await
430 .unwrap();
431 assert_eq!(result, data);
432 }
433
434 #[tokio::test]
435 async fn test_dag_chunk_stream() {
436 let block1 = Block::new(Bytes::from_static(b"chunk1")).unwrap();
438 let block2 = Block::new(Bytes::from_static(b"chunk2")).unwrap();
439
440 let mut fetcher = MemoryBlockFetcher::new();
441 fetcher.add_block(block1.clone());
442 fetcher.add_block(block2.clone());
443
444 let links = vec![
445 DagLink::new(*block1.cid(), 6),
446 DagLink::new(*block2.cid(), 6),
447 ];
448
449 let mut stream = DagChunkStream::new(std::sync::Arc::new(fetcher), links);
450
451 let chunk1 = stream.next_chunk().await.unwrap().unwrap();
452 assert_eq!(chunk1.as_ref(), b"chunk1");
453
454 let chunk2 = stream.next_chunk().await.unwrap().unwrap();
455 assert_eq!(chunk2.as_ref(), b"chunk2");
456
457 assert!(stream.next_chunk().await.is_none());
458 }
459}