ipfrs_core/
streaming.rs

1//! Streaming support for reading and writing blocks
2//!
3//! This module provides async streaming capabilities for block data,
4//! allowing efficient reading of chunked files and DAG structures.
5
6use crate::block::Block;
7use crate::chunking::{DagLink, DagNode};
8use crate::cid::Cid;
9use crate::error::{Error, Result};
10use bytes::Bytes;
11use futures::future::BoxFuture;
12use std::collections::VecDeque;
13use std::io::{self, Read};
14use std::pin::Pin;
15use std::task::{Context, Poll};
16use tokio::io::{AsyncRead, ReadBuf};
17
18/// Block fetcher trait for retrieving blocks by CID
19pub trait BlockFetcher: Send + Sync {
20    /// Fetch a block by its CID
21    fn fetch(&self, cid: Cid) -> BoxFuture<'_, Result<Block>>;
22}
23
24/// A simple in-memory block fetcher for testing
25pub struct MemoryBlockFetcher {
26    blocks: std::collections::HashMap<Cid, Block>,
27}
28
29impl MemoryBlockFetcher {
30    /// Create a new empty memory block fetcher
31    pub fn new() -> Self {
32        Self {
33            blocks: std::collections::HashMap::new(),
34        }
35    }
36
37    /// Add a block to the fetcher
38    pub fn add_block(&mut self, block: Block) {
39        self.blocks.insert(*block.cid(), block);
40    }
41
42    /// Add multiple blocks
43    pub fn add_blocks(&mut self, blocks: impl IntoIterator<Item = Block>) {
44        for block in blocks {
45            self.add_block(block);
46        }
47    }
48}
49
50impl Default for MemoryBlockFetcher {
51    fn default() -> Self {
52        Self::new()
53    }
54}
55
56impl BlockFetcher for MemoryBlockFetcher {
57    fn fetch(&self, cid: Cid) -> BoxFuture<'_, Result<Block>> {
58        let result = self
59            .blocks
60            .get(&cid)
61            .cloned()
62            .ok_or_else(|| Error::BlockNotFound(cid.to_string()));
63        Box::pin(async move { result })
64    }
65}
66
67/// Synchronous block reader for reading raw block data
68pub struct BlockReader {
69    data: Bytes,
70    position: usize,
71}
72
73impl BlockReader {
74    /// Create a new block reader
75    pub fn new(block: &Block) -> Self {
76        Self {
77            data: block.data().clone(),
78            position: 0,
79        }
80    }
81
82    /// Create from raw bytes
83    pub fn from_bytes(data: Bytes) -> Self {
84        Self { data, position: 0 }
85    }
86
87    /// Get the remaining bytes to read
88    pub fn remaining(&self) -> usize {
89        self.data.len() - self.position
90    }
91
92    /// Check if we've reached the end
93    pub fn is_empty(&self) -> bool {
94        self.position >= self.data.len()
95    }
96
97    /// Get total size
98    pub fn len(&self) -> usize {
99        self.data.len()
100    }
101}
102
103impl Read for BlockReader {
104    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
105        if self.position >= self.data.len() {
106            return Ok(0);
107        }
108
109        let remaining = &self.data[self.position..];
110        let to_read = std::cmp::min(buf.len(), remaining.len());
111        buf[..to_read].copy_from_slice(&remaining[..to_read]);
112        self.position += to_read;
113        Ok(to_read)
114    }
115}
116
117/// Async block reader implementing AsyncRead
118pub struct AsyncBlockReader {
119    data: Bytes,
120    position: usize,
121}
122
123impl AsyncBlockReader {
124    /// Create a new async block reader
125    pub fn new(block: &Block) -> Self {
126        Self {
127            data: block.data().clone(),
128            position: 0,
129        }
130    }
131
132    /// Create from raw bytes
133    pub fn from_bytes(data: Bytes) -> Self {
134        Self { data, position: 0 }
135    }
136
137    /// Get the remaining bytes to read
138    pub fn remaining(&self) -> usize {
139        self.data.len() - self.position
140    }
141
142    /// Check if we've reached the end
143    pub fn is_empty(&self) -> bool {
144        self.position >= self.data.len()
145    }
146
147    /// Get total size
148    pub fn len(&self) -> usize {
149        self.data.len()
150    }
151}
152
153impl AsyncRead for AsyncBlockReader {
154    fn poll_read(
155        mut self: Pin<&mut Self>,
156        _cx: &mut Context<'_>,
157        buf: &mut ReadBuf<'_>,
158    ) -> Poll<io::Result<()>> {
159        if self.position >= self.data.len() {
160            return Poll::Ready(Ok(()));
161        }
162
163        let remaining = &self.data[self.position..];
164        let to_read = std::cmp::min(buf.remaining(), remaining.len());
165        buf.put_slice(&remaining[..to_read]);
166        self.position += to_read;
167        Poll::Ready(Ok(()))
168    }
169}
170
171/// State for the DAG stream reader
172#[allow(dead_code)]
173enum DagReaderState {
174    /// Reading from current chunk
175    Reading { data: Bytes, position: usize },
176    /// Need to fetch next chunk
177    FetchingNext,
178    /// Finished reading
179    Done,
180}
181
182/// Async reader for DAG-structured data
183///
184/// Reads data from a DAG by traversing links and concatenating leaf data.
185#[allow(dead_code)]
186pub struct DagStreamReader<F: BlockFetcher> {
187    fetcher: std::sync::Arc<F>,
188    state: DagReaderState,
189    pending_links: VecDeque<DagLink>,
190    total_read: u64,
191}
192
193impl<F: BlockFetcher> DagStreamReader<F> {
194    /// Create a new DAG stream reader
195    pub fn new(fetcher: std::sync::Arc<F>, root_links: Vec<DagLink>) -> Self {
196        Self {
197            fetcher,
198            state: DagReaderState::FetchingNext,
199            pending_links: root_links.into(),
200            total_read: 0,
201        }
202    }
203
204    /// Create from a DAG node
205    pub fn from_node(fetcher: std::sync::Arc<F>, node: &DagNode) -> Self {
206        if let Some(data) = &node.data {
207            // Leaf node - read directly
208            Self {
209                fetcher,
210                state: DagReaderState::Reading {
211                    data: Bytes::from(data.clone()),
212                    position: 0,
213                },
214                pending_links: VecDeque::new(),
215                total_read: 0,
216            }
217        } else {
218            // Intermediate node - traverse links
219            Self::new(fetcher, node.links.clone())
220        }
221    }
222
223    /// Get the total bytes read so far
224    pub fn bytes_read(&self) -> u64 {
225        self.total_read
226    }
227}
228
229/// Async stream that yields chunks of data from a DAG
230pub struct DagChunkStream<F: BlockFetcher> {
231    fetcher: std::sync::Arc<F>,
232    pending_links: VecDeque<DagLink>,
233}
234
235impl<F: BlockFetcher> DagChunkStream<F> {
236    /// Create a new chunk stream from links
237    pub fn new(fetcher: std::sync::Arc<F>, links: Vec<DagLink>) -> Self {
238        Self {
239            fetcher,
240            pending_links: links.into(),
241        }
242    }
243
244    /// Fetch the next chunk (non-recursive implementation)
245    pub async fn next_chunk(&mut self) -> Option<Result<Bytes>> {
246        loop {
247            let link = self.pending_links.pop_front()?;
248
249            match self.fetcher.fetch(link.cid.0).await {
250                Ok(block) => {
251                    // Check if this is a leaf or intermediate node
252                    // Raw codec (0x55) means leaf data
253                    if block.cid().codec() == 0x55 {
254                        return Some(Ok(block.data().clone()));
255                    }
256
257                    // DAG-CBOR - need to parse and expand links
258                    match crate::ipld::Ipld::from_dag_cbor(block.data()) {
259                        Ok(ipld) => {
260                            if let crate::ipld::Ipld::Map(map) = ipld {
261                                // Check for links to add
262                                if let Some(crate::ipld::Ipld::List(links)) = map.get("links") {
263                                    // Add child links to the front of the queue (in reverse order)
264                                    let mut new_links = Vec::new();
265                                    for link_ipld in links {
266                                        if let crate::ipld::Ipld::Map(link_map) = link_ipld {
267                                            if let (
268                                                Some(crate::ipld::Ipld::Link(cid)),
269                                                Some(crate::ipld::Ipld::Integer(size)),
270                                            ) = (link_map.get("cid"), link_map.get("size"))
271                                            {
272                                                new_links.push(DagLink::new(cid.0, *size as u64));
273                                            }
274                                        }
275                                    }
276                                    // Prepend new links
277                                    for new_link in new_links.into_iter().rev() {
278                                        self.pending_links.push_front(new_link);
279                                    }
280                                }
281
282                                // Check for data in this node
283                                if let Some(crate::ipld::Ipld::Bytes(data)) = map.get("data") {
284                                    return Some(Ok(Bytes::from(data.clone())));
285                                }
286                            }
287                            // Continue to next link
288                        }
289                        Err(e) => return Some(Err(e)),
290                    }
291                }
292                Err(e) => return Some(Err(e)),
293            }
294        }
295    }
296
297    /// Check if there are more chunks to read
298    pub fn has_more(&self) -> bool {
299        !self.pending_links.is_empty()
300    }
301}
302
303/// Read all data from a chunked file
304pub async fn read_chunked_file<F: BlockFetcher>(fetcher: &F, root_cid: &Cid) -> Result<Vec<u8>> {
305    let root_block = fetcher.fetch(*root_cid).await?;
306
307    // Check if it's a single block or a DAG
308    if root_block.cid().codec() == 0x55 {
309        // Raw codec - single block
310        return Ok(root_block.data().to_vec());
311    }
312
313    // Parse the DAG node and collect all data using queue for correct ordering
314    let root_ipld = crate::ipld::Ipld::from_dag_cbor(root_block.data())?;
315
316    let mut result = Vec::new();
317    let mut queue: VecDeque<crate::ipld::Ipld> = VecDeque::new();
318    queue.push_back(root_ipld);
319
320    while let Some(ipld) = queue.pop_front() {
321        if let crate::ipld::Ipld::Map(map) = ipld {
322            // Check for data in this node first
323            if let Some(crate::ipld::Ipld::Bytes(data)) = map.get("data") {
324                result.extend_from_slice(data);
325            }
326
327            // Process links in order
328            if let Some(crate::ipld::Ipld::List(links)) = map.get("links") {
329                for link_ipld in links {
330                    if let crate::ipld::Ipld::Map(link_map) = link_ipld {
331                        if let Some(crate::ipld::Ipld::Link(cid)) = link_map.get("cid") {
332                            let block = fetcher.fetch(cid.0).await?;
333
334                            if block.cid().codec() == 0x55 {
335                                // Raw codec - leaf data
336                                result.extend_from_slice(block.data());
337                            } else {
338                                // DAG-CBOR - parse and add to queue
339                                let child_ipld = crate::ipld::Ipld::from_dag_cbor(block.data())?;
340                                queue.push_back(child_ipld);
341                            }
342                        }
343                    }
344                }
345            }
346        }
347    }
348
349    Ok(result)
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355    use crate::chunking::{Chunker, ChunkingConfig};
356    use std::io::Read;
357
358    #[test]
359    fn test_block_reader() {
360        let block = Block::new(Bytes::from_static(b"Hello, World!")).unwrap();
361        let mut reader = BlockReader::new(&block);
362
363        let mut buf = [0u8; 5];
364        let n = reader.read(&mut buf).unwrap();
365        assert_eq!(n, 5);
366        assert_eq!(&buf, b"Hello");
367
368        let n = reader.read(&mut buf).unwrap();
369        assert_eq!(n, 5);
370        assert_eq!(&buf, b", Wor");
371
372        let n = reader.read(&mut buf).unwrap();
373        assert_eq!(n, 3);
374        assert_eq!(&buf[..3], b"ld!");
375    }
376
377    #[tokio::test]
378    async fn test_async_block_reader() {
379        use tokio::io::AsyncReadExt;
380
381        let block = Block::new(Bytes::from_static(b"Hello, World!")).unwrap();
382        let mut reader = AsyncBlockReader::new(&block);
383
384        let mut buf = Vec::new();
385        reader.read_to_end(&mut buf).await.unwrap();
386        assert_eq!(buf, b"Hello, World!");
387    }
388
389    #[tokio::test]
390    async fn test_memory_block_fetcher() {
391        let block = Block::new(Bytes::from_static(b"test data")).unwrap();
392        let cid = *block.cid();
393
394        let mut fetcher = MemoryBlockFetcher::new();
395        fetcher.add_block(block.clone());
396
397        let fetched = fetcher.fetch(cid).await.unwrap();
398        assert_eq!(fetched.data(), block.data());
399    }
400
401    #[tokio::test]
402    async fn test_read_single_block_file() {
403        let data = b"Hello, IPFS!";
404        let block = Block::new(Bytes::from_static(data)).unwrap();
405        let cid = *block.cid();
406
407        let mut fetcher = MemoryBlockFetcher::new();
408        fetcher.add_block(block);
409
410        let result = read_chunked_file(&fetcher, &cid).await.unwrap();
411        assert_eq!(result, data);
412    }
413
414    #[tokio::test]
415    async fn test_read_chunked_file() {
416        // Create chunked data
417        let config = ChunkingConfig::with_chunk_size(1024).unwrap();
418        let chunker = Chunker::with_config(config);
419
420        let data: Vec<u8> = (0..3000).map(|i| (i % 256) as u8).collect();
421        let chunked = chunker.chunk(&data).unwrap();
422
423        // Add all blocks to fetcher
424        let mut fetcher = MemoryBlockFetcher::new();
425        fetcher.add_blocks(chunked.blocks.clone());
426
427        // Read back
428        let result = read_chunked_file(&fetcher, &chunked.root_cid)
429            .await
430            .unwrap();
431        assert_eq!(result, data);
432    }
433
434    #[tokio::test]
435    async fn test_dag_chunk_stream() {
436        // Create some test blocks
437        let block1 = Block::new(Bytes::from_static(b"chunk1")).unwrap();
438        let block2 = Block::new(Bytes::from_static(b"chunk2")).unwrap();
439
440        let mut fetcher = MemoryBlockFetcher::new();
441        fetcher.add_block(block1.clone());
442        fetcher.add_block(block2.clone());
443
444        let links = vec![
445            DagLink::new(*block1.cid(), 6),
446            DagLink::new(*block2.cid(), 6),
447        ];
448
449        let mut stream = DagChunkStream::new(std::sync::Arc::new(fetcher), links);
450
451        let chunk1 = stream.next_chunk().await.unwrap().unwrap();
452        assert_eq!(chunk1.as_ref(), b"chunk1");
453
454        let chunk2 = stream.next_chunk().await.unwrap().unwrap();
455        assert_eq!(chunk2.as_ref(), b"chunk2");
456
457        assert!(stream.next_chunk().await.is_none());
458    }
459}