monoutils_store/implementations/layouts/
flat.rs

1use std::{
2    cmp::Ordering,
3    io::{Error, ErrorKind, SeekFrom},
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use aliasable::boxed::AliasableBox;
9use async_stream::try_stream;
10use bytes::Bytes;
11use futures::{ready, stream::BoxStream, Future, StreamExt};
12use libipld::Cid;
13use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
14
15use crate::{
16    IpldStore, Layout, LayoutError, LayoutSeekable, MerkleNode, SeekableReader, StoreError,
17    StoreResult,
18};
19
20//--------------------------------------------------------------------------------------------------
21// Types
22//--------------------------------------------------------------------------------------------------
23
24/// A layout that organizes data into a flat array of chunks with a single merkle node parent.
25///
26/// ```txt
27///                      ┌─────────────┐
28///                      │ Merkle Node │
29///                      └──────┬──────┘
30///                             │
31///      ┌───────────────┬──────┴────────┬─────────────────┐
32///      │               │               │                 │
33///  0   ▼       1       ▼         2     ▼        3        ▼
34/// ┌──┬──┬──┐  ┌──┬──┬──┬──┬──┐  ┌──┬──┬──┬──┐  ┌──┬──┬──┬──┬──┬──┐
35/// │0 │1 │2 │  │3 │4 │5 │6 │7 │  │8 │9 │10│11│  │12│13│14│15│16│17│
36/// └──┴──┴──┘  └──┴──┴──┴──┴──┘  └──┴──┴──┴──┘  └──┴──┴──┴──┴──┴──┘
37/// ```
38#[derive(Clone, Debug, PartialEq, Default)]
39pub struct FlatLayout {}
40
41/// A reader for the flat DAG layout.
42///
43/// The reader maintains three state variables:
44///
45/// - The current byte position, `byte_cursor`.
46/// - The index of the current chunk within the node's children array, `chunk_index`.
47/// - The distance (in bytes) of the current chunk index from the start, `chunk_distance`.
48///
49/// These state variables are used to determine the current chunk to read from and the byte position
50/// within the chunk to read from. It basically enables seeking to any byte position within the
51/// chunk array.
52///
53/// ```txt
54///  Chunk Index    = 1
55///  Chunk Distance = 3
56///            │
57///            │
58///  0         ▼ 1                 2
59/// ┌──┬──┬──┐  ┌──┬──┬──┬──┬──┐  ┌──┬──┬──┬──┐
60/// │0 │1 │2 │  │3 │4 │5 │6 │7 │  │8 │9 │10│11│
61/// └──┴──┴──┘  └──┴──┴──┴──┴──┘  └──┴──┴──┴──┘
62///                    ▲
63///                    │
64///                    │
65///                Byte Cursor = 5
66/// ```
67pub struct FlatLayoutReader<S>
68where
69    S: IpldStore,
70{
71    /// The current byte position.
72    byte_cursor: u64,
73
74    /// The index of the current chunk within the node's children array.
75    chunk_index: u64,
76
77    /// The distance (in bytes) of the current chunk index from the start.
78    chunk_distance: u64,
79
80    /// A function to get a raw block.
81    ///
82    /// ## Important
83    ///
84    /// Holds a reference to other fields in this struct. Declared first to ensure it is dropped
85    /// before the other fields.
86    get_raw_block_fn: Pin<Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync + 'static>>,
87
88    /// The store associated with the reader.
89    ///
90    /// ## Warning
91    ///
92    /// Field must not be moved as it is referenced by `get_raw_block_fn`.
93    store: AliasableBox<S>,
94
95    /// The node that the reader is reading from.
96    ///
97    /// ## Warning
98    ///
99    /// Field must not be moved as it is referenced by `get_raw_block_fn`.
100    node: AliasableBox<MerkleNode>,
101}
102
103//--------------------------------------------------------------------------------------------------
104// Methods
105//--------------------------------------------------------------------------------------------------
106
107impl FlatLayout {
108    /// Create a new flat DAG layout.
109    pub fn new() -> Self {
110        FlatLayout {}
111    }
112}
113
114impl<S> FlatLayoutReader<S>
115where
116    S: IpldStore + Sync,
117{
118    /// Create a new flat DAG reader.
119    fn new(node: MerkleNode, store: S) -> StoreResult<Self> {
120        // Store node and store in the heap and make them aliasable.
121        let node = AliasableBox::from_unique(Box::new(node));
122        let store = AliasableBox::from_unique(Box::new(store));
123
124        // Create future to get the first node child.
125        let get_raw_block_fn: Pin<Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync>> =
126            Box::pin(
127                store.get_raw_block(
128                    node.children
129                        .first()
130                        .map(|(cid, _)| cid)
131                        .ok_or(StoreError::from(LayoutError::NoLeafBlock))?,
132                ),
133            );
134
135        // Unsafe magic to escape Rust ownership grip.
136        let get_raw_block_fn: Pin<
137            Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync + 'static>,
138        > = unsafe { std::mem::transmute(get_raw_block_fn) };
139
140        Ok(FlatLayoutReader {
141            byte_cursor: 0,
142            chunk_index: 0,
143            chunk_distance: 0,
144            get_raw_block_fn,
145            node,
146            store,
147        })
148    }
149
150    fn fix_future(&mut self) {
151        // Create future to get the next child.
152        let get_raw_block_fn: Pin<Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync>> =
153            Box::pin(async {
154                let bytes = self
155                    .store
156                    .get_raw_block(
157                        self.node
158                            .children
159                            .get(self.chunk_index as usize)
160                            .map(|(cid, _)| cid)
161                            .ok_or(StoreError::from(LayoutError::NoLeafBlock))?,
162                    )
163                    .await?;
164
165                // We just need bytes starting from byte cursor.
166                let bytes = Bytes::copy_from_slice(
167                    &bytes[(self.byte_cursor - self.chunk_distance) as usize..],
168                );
169
170                Ok(bytes)
171            });
172
173        // Unsafe magic to escape Rust ownership grip.
174        let get_raw_block_fn: Pin<
175            Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync + 'static>,
176        > = unsafe { std::mem::transmute(get_raw_block_fn) };
177
178        // Update type's future.
179        self.get_raw_block_fn = get_raw_block_fn;
180    }
181
182    fn read_update(&mut self, left_over: &[u8], consumed: u64) -> StoreResult<()> {
183        // Update the byte cursor.
184        self.byte_cursor += consumed;
185
186        // If there's left over bytes, we create a future to return the left over bytes.
187        if !left_over.is_empty() {
188            let bytes = Bytes::copy_from_slice(left_over);
189            let get_raw_block_fn = Box::pin(async { Ok(bytes) });
190            self.get_raw_block_fn = get_raw_block_fn;
191            return Ok(());
192        }
193
194        // If we've reached the end of the bytes, create a future that returns empty bytes.
195        if self.byte_cursor >= self.node.size as u64 {
196            let get_raw_block_fn = Box::pin(async { Ok(Bytes::new()) });
197            self.get_raw_block_fn = get_raw_block_fn;
198            return Ok(());
199        }
200
201        // Update the chunk distance and chunk index.
202        self.chunk_distance += self.node.children[self.chunk_index as usize].1 as u64;
203        self.chunk_index += 1;
204
205        // Update the future.
206        self.fix_future();
207
208        Ok(())
209    }
210
211    fn seek_update(&mut self, byte_cursor: u64) -> StoreResult<()> {
212        // Update the byte cursor.
213        self.byte_cursor = byte_cursor;
214
215        // If we've reached the end of the bytes, create a future that returns empty bytes.
216        if self.byte_cursor >= self.node.size as u64 {
217            let get_raw_block_fn = Box::pin(async { Ok(Bytes::new()) });
218            self.get_raw_block_fn = get_raw_block_fn;
219            return Ok(());
220        }
221
222        // We need to update the chunk index and distance essentially making sure that chunk index and distance
223        // are referring to the chunk that the byte cursor is pointing to.
224        loop {
225            match self.chunk_distance.cmp(&byte_cursor) {
226                Ordering::Less => {
227                    if self.chunk_distance + self.node.children[self.chunk_index as usize].1 as u64
228                        > byte_cursor
229                    {
230                        break;
231                    }
232
233                    self.chunk_distance += self.node.children[self.chunk_index as usize].1 as u64;
234                    self.chunk_index += 1;
235
236                    continue;
237                }
238                Ordering::Greater => {
239                    self.chunk_distance -= self.node.children[self.chunk_index as usize].1 as u64;
240                    self.chunk_index -= 1;
241
242                    continue;
243                }
244                _ => break,
245            }
246        }
247
248        // Update the future.
249        self.fix_future();
250
251        Ok(())
252    }
253}
254
255//--------------------------------------------------------------------------------------------------
256// Trait Implementations
257//--------------------------------------------------------------------------------------------------
258
259impl Layout for FlatLayout {
260    async fn organize<'a>(
261        &self,
262        mut stream: BoxStream<'a, StoreResult<Bytes>>,
263        store: impl IpldStore + Send + 'a,
264    ) -> StoreResult<BoxStream<'a, StoreResult<Cid>>> {
265        let s = try_stream! {
266            let mut children = Vec::new();
267            while let Some(Ok(chunk)) = stream.next().await {
268                let len = chunk.len();
269                let cid = store.put_raw_block(chunk).await?;
270                children.push((cid, len));
271                yield cid;
272            }
273
274            let node = MerkleNode::new(children);
275            let cid = store.put_node(&node).await?;
276
277            yield cid;
278        };
279
280        Ok(Box::pin(s))
281    }
282
283    async fn retrieve<'a>(
284        &self,
285        cid: &Cid,
286        store: impl IpldStore + Send + Sync + 'a,
287    ) -> StoreResult<Pin<Box<dyn AsyncRead + Send + Sync + 'a>>> {
288        let node = store.get_node(cid).await?;
289        let reader = FlatLayoutReader::new(node, store)?;
290        Ok(Box::pin(reader))
291    }
292}
293
294impl LayoutSeekable for FlatLayout {
295    async fn retrieve_seekable<'a>(
296        &self,
297        cid: &'a Cid,
298        store: impl IpldStore + Send + Sync + 'a,
299    ) -> StoreResult<Pin<Box<dyn SeekableReader + Send + 'a>>> {
300        let node = store.get_node(cid).await?;
301        let reader = FlatLayoutReader::new(node, store)?;
302        Ok(Box::pin(reader))
303    }
304}
305
306impl<S> AsyncRead for FlatLayoutReader<S>
307where
308    S: IpldStore + Sync,
309{
310    fn poll_read(
311        mut self: Pin<&mut Self>,
312        cx: &mut Context<'_>,
313        buf: &mut ReadBuf<'_>,
314    ) -> Poll<std::io::Result<()>> {
315        // Get the next chunk of bytes.
316        let bytes = ready!(self.get_raw_block_fn.as_mut().poll(cx))
317            .map_err(|e| Error::new(ErrorKind::Other, e))?;
318
319        // If the bytes is longer than the buffer, we only take the amount that fits.
320        let (taken, left_over) = if bytes.len() > buf.remaining() {
321            bytes.split_at(buf.remaining())
322        } else {
323            (&bytes[..], &[][..])
324        };
325
326        // Copy the slice to the buffer.
327        buf.put_slice(taken);
328
329        // Update the reader's state.
330        self.read_update(left_over, taken.len() as u64)
331            .map_err(|e| Error::new(ErrorKind::Other, e))?;
332
333        Poll::Ready(Ok(()))
334    }
335}
336
337impl<S> AsyncSeek for FlatLayoutReader<S>
338where
339    S: IpldStore + Sync,
340{
341    fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
342        let byte_cursor = match position {
343            SeekFrom::Start(offset) => {
344                if offset >= self.node.size as u64 {
345                    return Err(Error::new(
346                        ErrorKind::InvalidInput,
347                        "Seek from start position out of bounds",
348                    ));
349                }
350
351                offset
352            }
353            SeekFrom::Current(offset) => {
354                let new_cursor = self.byte_cursor as i64 + offset;
355                if new_cursor < 0 || new_cursor >= self.node.size as i64 {
356                    return Err(Error::new(
357                        ErrorKind::InvalidInput,
358                        "Seek from current position out of bounds",
359                    ));
360                }
361
362                new_cursor as u64
363            }
364            SeekFrom::End(offset) => {
365                let new_cursor = self.node.size as i64 + offset;
366                if new_cursor < 0 || new_cursor >= self.node.size as i64 {
367                    return Err(Error::new(
368                        ErrorKind::InvalidInput,
369                        "Seek from end position out of bounds",
370                    ));
371                }
372
373                new_cursor as u64
374            }
375        };
376
377        // Update the reader's state.
378        self.seek_update(byte_cursor)
379            .map_err(|e| Error::new(ErrorKind::Other, e))?;
380
381        Ok(())
382    }
383
384    fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
385        Poll::Ready(Ok(self.byte_cursor))
386    }
387}
388
389//--------------------------------------------------------------------------------------------------
390// Tests
391//--------------------------------------------------------------------------------------------------
392
393#[cfg(test)]
394mod tests {
395    use futures::TryStreamExt;
396    use tokio::io::{AsyncReadExt, AsyncSeekExt};
397
398    use crate::MemoryStore;
399
400    use super::*;
401
402    #[tokio::test]
403    async fn test_flat_dag_layout_organize_and_retrieve() -> anyhow::Result<()> {
404        let store = MemoryStore::default();
405        let (data, _, chunk_stream) = fixtures::data_and_chunk_stream();
406
407        // Organize chunks into a DAG.
408        let layout = FlatLayout::default();
409        let cid_stream = layout.organize(chunk_stream, store.clone()).await?;
410
411        // Get the CID of the merkle node.
412        let cids = cid_stream.try_collect::<Vec<_>>().await?;
413        let cid = cids.last().unwrap();
414
415        // Case: fill buffer automatically with `read_to_end`
416        let mut reader = layout.retrieve(cid, store.clone()).await?;
417        let mut bytes = Vec::new();
418        reader.read_to_end(&mut bytes).await?;
419
420        assert_eq!(bytes, data);
421
422        // Case: fill buffer manually with `read`
423        let mut reader = layout.retrieve(cid, store).await?;
424        let mut bytes: Vec<u8> = vec![];
425        loop {
426            let mut buf = vec![0; 5];
427            let filled = reader.read(&mut buf).await?;
428            if filled == 0 {
429                break;
430            }
431
432            bytes.extend(&buf[..filled]);
433        }
434
435        assert_eq!(bytes, data);
436
437        Ok(())
438    }
439
440    #[tokio::test]
441    async fn test_flat_dag_layout_seek() -> anyhow::Result<()> {
442        let store = MemoryStore::default();
443        let (_, chunks, chunk_stream) = fixtures::data_and_chunk_stream();
444
445        // Organize chunks into a DAG.
446        let layout = FlatLayout::default();
447        let cid_stream = layout.organize(chunk_stream, store.clone()).await?;
448
449        // Get the CID of the first chunk.
450        let cids = cid_stream.try_collect::<Vec<_>>().await?;
451        let cid = cids.last().unwrap();
452
453        // Get seekable reader.
454        let mut reader = layout.retrieve_seekable(cid, store).await?;
455
456        // Case: read the first chunk.
457        let mut buf = vec![0; 5];
458        reader.read(&mut buf).await?;
459
460        assert_eq!(buf, chunks[0]);
461
462        // Case: skip a chunk by seeking from current and have cursor be at boundary of chunk.
463        let mut buf = vec![0; 5];
464        reader.seek(SeekFrom::Current(5)).await?;
465        reader.read(&mut buf).await?;
466
467        assert_eq!(buf, chunks[2]);
468
469        // Case: seek to the next chunk from current and have cursor be in the middle of chunk.
470        let mut buf = vec![0; 3];
471        reader.seek(SeekFrom::Current(3)).await?;
472        reader.read(&mut buf).await?;
473
474        assert_eq!(buf, chunks[3][3..]);
475
476        // Case: Seek to some chunk before end.
477        let mut buf = vec![0; 5];
478        reader.seek(SeekFrom::End(-5)).await?;
479        reader.read(&mut buf).await?;
480
481        assert_eq!(buf, chunks[9]);
482
483        // Case: Seek to some chunk after start.
484        let mut buf = vec![0; 5];
485        reader.seek(SeekFrom::Start(5)).await?;
486        reader.read(&mut buf).await?;
487
488        assert_eq!(buf, chunks[1]);
489
490        // Case: Fail: Seek beyond end.
491        let result = reader.seek(SeekFrom::End(5)).await;
492        assert!(result.is_err());
493
494        let result = reader.seek(SeekFrom::End(0)).await;
495        assert!(result.is_err());
496
497        let result = reader.seek(SeekFrom::Start(100)).await;
498        assert!(result.is_err());
499
500        let result = reader.seek(SeekFrom::Current(100)).await;
501        assert!(result.is_err());
502
503        // Case: Fail: Seek before start.
504        let result = reader.seek(SeekFrom::Current(-100)).await;
505        assert!(result.is_err());
506
507        Ok(())
508    }
509}
510
511#[cfg(test)]
512mod fixtures {
513    use futures::{stream, Stream};
514
515    use super::*;
516
517    pub(super) fn data_and_chunk_stream() -> (
518        [u8; 56],
519        Vec<Bytes>,
520        Pin<Box<dyn Stream<Item = StoreResult<Bytes>> + Send + 'static>>,
521    ) {
522        let data = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit.".to_owned();
523
524        let chunks = vec![
525            Bytes::from("Lorem"),
526            Bytes::from(" ipsu"),
527            Bytes::from("m dol"),
528            Bytes::from("or sit"),
529            Bytes::from(" amet,"),
530            Bytes::from(" conse"),
531            Bytes::from("ctetur"),
532            Bytes::from(" adipi"),
533            Bytes::from("scing "),
534            Bytes::from("elit."),
535        ];
536
537        let chunks_result = chunks
538            .iter()
539            .cloned()
540            .map(|b| crate::Ok(b))
541            .collect::<Vec<_>>();
542
543        let chunk_stream = Box::pin(stream::iter(chunks_result));
544
545        (data, chunks, chunk_stream)
546    }
547}