1use crate::traits::BlockStore;
7use bytes::{Bytes, BytesMut};
8use futures::stream::Stream;
9use ipfrs_core::{Block, Cid, Error, Result};
10use std::io::SeekFrom;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
15
16#[derive(Debug, Clone)]
18pub struct StreamConfig {
19 pub buffer_size: usize,
21 pub prefetch: bool,
23 pub prefetch_queue_size: usize,
25}
26
27impl Default for StreamConfig {
28 fn default() -> Self {
29 Self {
30 buffer_size: 64 * 1024, prefetch: true,
32 prefetch_queue_size: 4,
33 }
34 }
35}
36
37#[derive(Debug, Clone, Copy)]
39pub struct ByteRange {
40 pub start: u64,
42 pub end: Option<u64>,
44}
45
46impl ByteRange {
47 #[inline]
49 pub fn from(start: u64) -> Self {
50 Self { start, end: None }
51 }
52
53 #[inline]
55 pub fn new(start: u64, end: u64) -> Self {
56 Self {
57 start,
58 end: Some(end),
59 }
60 }
61
62 #[inline]
64 pub fn with_length(start: u64, length: u64) -> Self {
65 Self {
66 start,
67 end: Some(start + length),
68 }
69 }
70
71 #[inline]
73 pub fn length(&self, total_size: u64) -> u64 {
74 let end = self.end.unwrap_or(total_size).min(total_size);
75 end.saturating_sub(self.start)
76 }
77}
78
79pub struct BlockReader {
81 data: Bytes,
82 position: u64,
83}
84
85impl BlockReader {
86 #[inline]
88 pub fn new(block: &Block) -> Self {
89 Self {
90 data: block.data().clone(),
91 position: 0,
92 }
93 }
94
95 #[inline]
97 pub fn from_bytes(data: Bytes) -> Self {
98 Self { data, position: 0 }
99 }
100
101 #[inline]
103 pub fn remaining(&self) -> u64 {
104 self.data.len() as u64 - self.position
105 }
106
107 pub fn size(&self) -> u64 {
109 self.data.len() as u64
110 }
111}
112
113impl AsyncRead for BlockReader {
114 fn poll_read(
115 mut self: Pin<&mut Self>,
116 _cx: &mut Context<'_>,
117 buf: &mut ReadBuf<'_>,
118 ) -> Poll<std::io::Result<()>> {
119 let pos = self.position as usize;
120 let data_len = self.data.len();
121
122 if pos >= data_len {
123 return Poll::Ready(Ok(())); }
125
126 let remaining = data_len - pos;
127 let to_read = remaining.min(buf.remaining());
128 buf.put_slice(&self.data[pos..pos + to_read]);
129 self.position += to_read as u64;
130
131 Poll::Ready(Ok(()))
132 }
133}
134
135impl AsyncSeek for BlockReader {
136 fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
137 let new_pos = match position {
138 SeekFrom::Start(pos) => pos as i64,
139 SeekFrom::End(offset) => self.data.len() as i64 + offset,
140 SeekFrom::Current(offset) => self.position as i64 + offset,
141 };
142
143 if new_pos < 0 {
144 return Err(std::io::Error::new(
145 std::io::ErrorKind::InvalidInput,
146 "seek to negative position",
147 ));
148 }
149
150 self.position = new_pos as u64;
151 Ok(())
152 }
153
154 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
155 Poll::Ready(Ok(self.position))
156 }
157}
158
159pub struct PartialBlock {
161 pub cid: Cid,
163 pub range: ByteRange,
165 pub data: Bytes,
167 pub total_size: u64,
169}
170
171impl PartialBlock {
172 pub fn is_complete(&self) -> bool {
174 self.range.start == 0 && self.data.len() as u64 == self.total_size
175 }
176}
177
178#[async_trait::async_trait]
180pub trait StreamingBlockStore: BlockStore {
181 async fn get_range(&self, cid: &Cid, range: ByteRange) -> Result<Option<PartialBlock>> {
186 let block = self.get(cid).await?;
188 match block {
189 Some(block) => {
190 let data = block.data();
191 let total_size = data.len() as u64;
192
193 let start = (range.start as usize).min(data.len());
194 let end = range
195 .end
196 .map(|e| (e as usize).min(data.len()))
197 .unwrap_or(data.len());
198
199 let slice = if start < end {
200 data.slice(start..end)
201 } else {
202 Bytes::new()
203 };
204
205 Ok(Some(PartialBlock {
206 cid: *block.cid(),
207 range,
208 data: slice,
209 total_size,
210 }))
211 }
212 None => Ok(None),
213 }
214 }
215
216 async fn reader(&self, cid: &Cid) -> Result<Option<BlockReader>> {
218 let block = self.get(cid).await?;
219 Ok(block.map(|b| BlockReader::new(&b)))
220 }
221
222 async fn get_size(&self, cid: &Cid) -> Result<Option<u64>> {
224 let block = self.get(cid).await?;
226 Ok(block.map(|b| b.size()))
227 }
228}
229
230impl<T: BlockStore> StreamingBlockStore for T {}
232
233pub struct StreamingWriter<S: BlockStore> {
235 store: Arc<S>,
236 buffer: BytesMut,
237 config: StreamConfig,
238 written_cids: Vec<Cid>,
239}
240
241impl<S: BlockStore> StreamingWriter<S> {
242 pub fn new(store: Arc<S>) -> Self {
244 Self::with_config(store, StreamConfig::default())
245 }
246
247 pub fn with_config(store: Arc<S>, config: StreamConfig) -> Self {
249 Self {
250 store,
251 buffer: BytesMut::with_capacity(config.buffer_size),
252 config,
253 written_cids: Vec::new(),
254 }
255 }
256
257 pub async fn write(&mut self, data: &[u8]) -> Result<usize> {
259 self.buffer.extend_from_slice(data);
260
261 while self.buffer.len() >= self.config.buffer_size {
263 self.flush_chunk().await?;
264 }
265
266 Ok(data.len())
267 }
268
269 pub async fn finish(mut self) -> Result<Vec<Cid>> {
271 if !self.buffer.is_empty() {
272 self.flush_chunk().await?;
273 }
274 Ok(self.written_cids)
275 }
276
277 async fn flush_chunk(&mut self) -> Result<()> {
279 let chunk_size = self.buffer.len().min(self.config.buffer_size);
280 let chunk_data = self.buffer.split_to(chunk_size).freeze();
281
282 let block = Block::new(chunk_data)?;
283 let cid = *block.cid();
284 self.store.put(&block).await?;
285 self.written_cids.push(cid);
286
287 Ok(())
288 }
289
290 pub fn written_cids(&self) -> &[Cid] {
292 &self.written_cids
293 }
294}
295
296pub struct BlockStream<S: BlockStore> {
298 store: Arc<S>,
299 cids: std::vec::IntoIter<Cid>,
300}
301
302impl<S: BlockStore + 'static> BlockStream<S> {
303 pub fn new(store: Arc<S>, cids: Vec<Cid>) -> Self {
305 Self {
306 store,
307 cids: cids.into_iter(),
308 }
309 }
310}
311
312impl<S: BlockStore + 'static> Stream for BlockStream<S> {
313 type Item = Result<Block>;
314
315 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
316 match self.cids.next() {
317 Some(cid) => {
318 let store = Arc::clone(&self.store);
319 let fut = async move {
320 match store.get(&cid).await? {
321 Some(block) => Ok(block),
322 None => Err(Error::BlockNotFound(cid.to_string())),
323 }
324 };
325 let waker = cx.waker().clone();
329 tokio::spawn(async move {
330 let _ = fut.await;
331 waker.wake();
332 });
333 Poll::Pending
334 }
335 None => Poll::Ready(None),
336 }
337 }
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343
344 #[test]
345 fn test_byte_range() {
346 let range = ByteRange::new(10, 50);
347 assert_eq!(range.length(100), 40);
348 assert_eq!(range.length(30), 20); let range = ByteRange::from(80);
351 assert_eq!(range.length(100), 20);
352
353 let range = ByteRange::with_length(10, 30);
354 assert_eq!(range.length(100), 30);
355 }
356
357 #[tokio::test]
358 async fn test_block_reader() {
359 use tokio::io::AsyncReadExt;
360
361 let data = Bytes::from("Hello, World!");
362 let block = Block::new(data.clone()).unwrap();
363 let mut reader = BlockReader::new(&block);
364
365 let mut buf = vec![0u8; 5];
366 let n = reader.read(&mut buf).await.unwrap();
367 assert_eq!(n, 5);
368 assert_eq!(&buf, b"Hello");
369
370 let n = reader.read(&mut buf).await.unwrap();
371 assert_eq!(n, 5);
372 assert_eq!(&buf, b", Wor");
373 }
374
375 #[tokio::test]
376 async fn test_block_reader_seek() {
377 use tokio::io::{AsyncReadExt, AsyncSeekExt};
378
379 let data = Bytes::from("Hello, World!");
380 let block = Block::new(data).unwrap();
381 let mut reader = BlockReader::new(&block);
382
383 reader.seek(SeekFrom::Start(7)).await.unwrap();
384
385 let mut buf = vec![0u8; 5];
386 let n = reader.read(&mut buf).await.unwrap();
387 assert_eq!(n, 5);
388 assert_eq!(&buf, b"World");
389 }
390}