ipfrs_storage/
streaming.rs

1//! Streaming interfaces for large block operations.
2//!
3//! Provides AsyncRead/AsyncWrite implementations for efficient handling
4//! of large blocks without loading everything into memory.
5
6use crate::traits::BlockStore;
7use bytes::{Bytes, BytesMut};
8use futures::stream::Stream;
9use ipfrs_core::{Block, Cid, Error, Result};
10use std::io::SeekFrom;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
15
16/// Configuration for streaming operations
17#[derive(Debug, Clone)]
18pub struct StreamConfig {
19    /// Buffer size for streaming reads/writes
20    pub buffer_size: usize,
21    /// Whether to prefetch next chunks
22    pub prefetch: bool,
23    /// Maximum prefetch queue size
24    pub prefetch_queue_size: usize,
25}
26
27impl Default for StreamConfig {
28    fn default() -> Self {
29        Self {
30            buffer_size: 64 * 1024, // 64KB chunks
31            prefetch: true,
32            prefetch_queue_size: 4,
33        }
34    }
35}
36
37/// Range specification for partial block reads
38#[derive(Debug, Clone, Copy)]
39pub struct ByteRange {
40    /// Start offset (inclusive)
41    pub start: u64,
42    /// End offset (exclusive), None means end of block
43    pub end: Option<u64>,
44}
45
46impl ByteRange {
47    /// Create a range from start to end of block
48    #[inline]
49    pub fn from(start: u64) -> Self {
50        Self { start, end: None }
51    }
52
53    /// Create a range with specific start and end
54    #[inline]
55    pub fn new(start: u64, end: u64) -> Self {
56        Self {
57            start,
58            end: Some(end),
59        }
60    }
61
62    /// Create a range for a specific length from start
63    #[inline]
64    pub fn with_length(start: u64, length: u64) -> Self {
65        Self {
66            start,
67            end: Some(start + length),
68        }
69    }
70
71    /// Get the length of this range, given the total size
72    #[inline]
73    pub fn length(&self, total_size: u64) -> u64 {
74        let end = self.end.unwrap_or(total_size).min(total_size);
75        end.saturating_sub(self.start)
76    }
77}
78
79/// Async reader for a single block with seeking support
80pub struct BlockReader {
81    data: Bytes,
82    position: u64,
83}
84
85impl BlockReader {
86    /// Create a new block reader
87    #[inline]
88    pub fn new(block: &Block) -> Self {
89        Self {
90            data: block.data().clone(),
91            position: 0,
92        }
93    }
94
95    /// Create from raw bytes
96    #[inline]
97    pub fn from_bytes(data: Bytes) -> Self {
98        Self { data, position: 0 }
99    }
100
101    /// Get the remaining bytes
102    #[inline]
103    pub fn remaining(&self) -> u64 {
104        self.data.len() as u64 - self.position
105    }
106
107    /// Get the total size
108    pub fn size(&self) -> u64 {
109        self.data.len() as u64
110    }
111}
112
113impl AsyncRead for BlockReader {
114    fn poll_read(
115        mut self: Pin<&mut Self>,
116        _cx: &mut Context<'_>,
117        buf: &mut ReadBuf<'_>,
118    ) -> Poll<std::io::Result<()>> {
119        let pos = self.position as usize;
120        let data_len = self.data.len();
121
122        if pos >= data_len {
123            return Poll::Ready(Ok(())); // EOF
124        }
125
126        let remaining = data_len - pos;
127        let to_read = remaining.min(buf.remaining());
128        buf.put_slice(&self.data[pos..pos + to_read]);
129        self.position += to_read as u64;
130
131        Poll::Ready(Ok(()))
132    }
133}
134
135impl AsyncSeek for BlockReader {
136    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
137        let new_pos = match position {
138            SeekFrom::Start(pos) => pos as i64,
139            SeekFrom::End(offset) => self.data.len() as i64 + offset,
140            SeekFrom::Current(offset) => self.position as i64 + offset,
141        };
142
143        if new_pos < 0 {
144            return Err(std::io::Error::new(
145                std::io::ErrorKind::InvalidInput,
146                "seek to negative position",
147            ));
148        }
149
150        self.position = new_pos as u64;
151        Ok(())
152    }
153
154    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
155        Poll::Ready(Ok(self.position))
156    }
157}
158
159/// Partial block read result
160pub struct PartialBlock {
161    /// The CID of the full block
162    pub cid: Cid,
163    /// The requested range
164    pub range: ByteRange,
165    /// The actual data (slice of the block)
166    pub data: Bytes,
167    /// Total size of the full block
168    pub total_size: u64,
169}
170
171impl PartialBlock {
172    /// Check if this is the complete block
173    pub fn is_complete(&self) -> bool {
174        self.range.start == 0 && self.data.len() as u64 == self.total_size
175    }
176}
177
178/// Extension trait for block stores with streaming capabilities
179#[async_trait::async_trait]
180pub trait StreamingBlockStore: BlockStore {
181    /// Read a partial block (range-based)
182    ///
183    /// This allows efficient access to portions of large blocks without
184    /// loading the entire block into memory.
185    async fn get_range(&self, cid: &Cid, range: ByteRange) -> Result<Option<PartialBlock>> {
186        // Default implementation: read full block and slice
187        let block = self.get(cid).await?;
188        match block {
189            Some(block) => {
190                let data = block.data();
191                let total_size = data.len() as u64;
192
193                let start = (range.start as usize).min(data.len());
194                let end = range
195                    .end
196                    .map(|e| (e as usize).min(data.len()))
197                    .unwrap_or(data.len());
198
199                let slice = if start < end {
200                    data.slice(start..end)
201                } else {
202                    Bytes::new()
203                };
204
205                Ok(Some(PartialBlock {
206                    cid: *block.cid(),
207                    range,
208                    data: slice,
209                    total_size,
210                }))
211            }
212            None => Ok(None),
213        }
214    }
215
216    /// Create an async reader for a block
217    async fn reader(&self, cid: &Cid) -> Result<Option<BlockReader>> {
218        let block = self.get(cid).await?;
219        Ok(block.map(|b| BlockReader::new(&b)))
220    }
221
222    /// Get the size of a block without reading its contents
223    async fn get_size(&self, cid: &Cid) -> Result<Option<u64>> {
224        // Default: read block and get size (inefficient, backends should override)
225        let block = self.get(cid).await?;
226        Ok(block.map(|b| b.size()))
227    }
228}
229
230// Implement StreamingBlockStore for any BlockStore
231impl<T: BlockStore> StreamingBlockStore for T {}
232
233/// Streaming block writer for building large content
234pub struct StreamingWriter<S: BlockStore> {
235    store: Arc<S>,
236    buffer: BytesMut,
237    config: StreamConfig,
238    written_cids: Vec<Cid>,
239}
240
241impl<S: BlockStore> StreamingWriter<S> {
242    /// Create a new streaming writer
243    pub fn new(store: Arc<S>) -> Self {
244        Self::with_config(store, StreamConfig::default())
245    }
246
247    /// Create with custom configuration
248    pub fn with_config(store: Arc<S>, config: StreamConfig) -> Self {
249        Self {
250            store,
251            buffer: BytesMut::with_capacity(config.buffer_size),
252            config,
253            written_cids: Vec::new(),
254        }
255    }
256
257    /// Write data, automatically chunking into blocks
258    pub async fn write(&mut self, data: &[u8]) -> Result<usize> {
259        self.buffer.extend_from_slice(data);
260
261        // Flush complete chunks
262        while self.buffer.len() >= self.config.buffer_size {
263            self.flush_chunk().await?;
264        }
265
266        Ok(data.len())
267    }
268
269    /// Flush any buffered data as a final block
270    pub async fn finish(mut self) -> Result<Vec<Cid>> {
271        if !self.buffer.is_empty() {
272            self.flush_chunk().await?;
273        }
274        Ok(self.written_cids)
275    }
276
277    /// Flush current buffer as a block
278    async fn flush_chunk(&mut self) -> Result<()> {
279        let chunk_size = self.buffer.len().min(self.config.buffer_size);
280        let chunk_data = self.buffer.split_to(chunk_size).freeze();
281
282        let block = Block::new(chunk_data)?;
283        let cid = *block.cid();
284        self.store.put(&block).await?;
285        self.written_cids.push(cid);
286
287        Ok(())
288    }
289
290    /// Get the CIDs written so far
291    pub fn written_cids(&self) -> &[Cid] {
292        &self.written_cids
293    }
294}
295
296/// Stream of blocks from an iterator of CIDs
297pub struct BlockStream<S: BlockStore> {
298    store: Arc<S>,
299    cids: std::vec::IntoIter<Cid>,
300}
301
302impl<S: BlockStore + 'static> BlockStream<S> {
303    /// Create a new block stream
304    pub fn new(store: Arc<S>, cids: Vec<Cid>) -> Self {
305        Self {
306            store,
307            cids: cids.into_iter(),
308        }
309    }
310}
311
312impl<S: BlockStore + 'static> Stream for BlockStream<S> {
313    type Item = Result<Block>;
314
315    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
316        match self.cids.next() {
317            Some(cid) => {
318                let store = Arc::clone(&self.store);
319                let fut = async move {
320                    match store.get(&cid).await? {
321                        Some(block) => Ok(block),
322                        None => Err(Error::BlockNotFound(cid.to_string())),
323                    }
324                };
325                // Pin the future and poll it
326                // For simplicity, we return Pending and use a spawned task
327                // In production, use proper async stream combinators
328                let waker = cx.waker().clone();
329                tokio::spawn(async move {
330                    let _ = fut.await;
331                    waker.wake();
332                });
333                Poll::Pending
334            }
335            None => Poll::Ready(None),
336        }
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    use super::*;
343
344    #[test]
345    fn test_byte_range() {
346        let range = ByteRange::new(10, 50);
347        assert_eq!(range.length(100), 40);
348        assert_eq!(range.length(30), 20); // Clamped to size
349
350        let range = ByteRange::from(80);
351        assert_eq!(range.length(100), 20);
352
353        let range = ByteRange::with_length(10, 30);
354        assert_eq!(range.length(100), 30);
355    }
356
357    #[tokio::test]
358    async fn test_block_reader() {
359        use tokio::io::AsyncReadExt;
360
361        let data = Bytes::from("Hello, World!");
362        let block = Block::new(data.clone()).unwrap();
363        let mut reader = BlockReader::new(&block);
364
365        let mut buf = vec![0u8; 5];
366        let n = reader.read(&mut buf).await.unwrap();
367        assert_eq!(n, 5);
368        assert_eq!(&buf, b"Hello");
369
370        let n = reader.read(&mut buf).await.unwrap();
371        assert_eq!(n, 5);
372        assert_eq!(&buf, b", Wor");
373    }
374
375    #[tokio::test]
376    async fn test_block_reader_seek() {
377        use tokio::io::{AsyncReadExt, AsyncSeekExt};
378
379        let data = Bytes::from("Hello, World!");
380        let block = Block::new(data).unwrap();
381        let mut reader = BlockReader::new(&block);
382
383        reader.seek(SeekFrom::Start(7)).await.unwrap();
384
385        let mut buf = vec![0u8; 5];
386        let n = reader.read(&mut buf).await.unwrap();
387        assert_eq!(n, 5);
388        assert_eq!(&buf, b"World");
389    }
390}