monoutils_store/implementations/layouts/
flat.rs1use std::{
2 cmp::Ordering,
3 io::{Error, ErrorKind, SeekFrom},
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use aliasable::boxed::AliasableBox;
9use async_stream::try_stream;
10use bytes::Bytes;
11use futures::{ready, stream::BoxStream, Future, StreamExt};
12use libipld::Cid;
13use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
14
15use crate::{
16 IpldStore, Layout, LayoutError, LayoutSeekable, MerkleNode, SeekableReader, StoreError,
17 StoreResult,
18};
19
20#[derive(Clone, Debug, PartialEq, Default)]
39pub struct FlatLayout {}
40
41pub struct FlatLayoutReader<S>
68where
69 S: IpldStore,
70{
71 byte_cursor: u64,
73
74 chunk_index: u64,
76
77 chunk_distance: u64,
79
80 get_raw_block_fn: Pin<Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync + 'static>>,
87
88 store: AliasableBox<S>,
94
95 node: AliasableBox<MerkleNode>,
101}
102
103impl FlatLayout {
108 pub fn new() -> Self {
110 FlatLayout {}
111 }
112}
113
114impl<S> FlatLayoutReader<S>
115where
116 S: IpldStore + Sync,
117{
118 fn new(node: MerkleNode, store: S) -> StoreResult<Self> {
120 let node = AliasableBox::from_unique(Box::new(node));
122 let store = AliasableBox::from_unique(Box::new(store));
123
124 let get_raw_block_fn: Pin<Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync>> =
126 Box::pin(
127 store.get_raw_block(
128 node.children
129 .first()
130 .map(|(cid, _)| cid)
131 .ok_or(StoreError::from(LayoutError::NoLeafBlock))?,
132 ),
133 );
134
135 let get_raw_block_fn: Pin<
137 Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync + 'static>,
138 > = unsafe { std::mem::transmute(get_raw_block_fn) };
139
140 Ok(FlatLayoutReader {
141 byte_cursor: 0,
142 chunk_index: 0,
143 chunk_distance: 0,
144 get_raw_block_fn,
145 node,
146 store,
147 })
148 }
149
150 fn fix_future(&mut self) {
151 let get_raw_block_fn: Pin<Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync>> =
153 Box::pin(async {
154 let bytes = self
155 .store
156 .get_raw_block(
157 self.node
158 .children
159 .get(self.chunk_index as usize)
160 .map(|(cid, _)| cid)
161 .ok_or(StoreError::from(LayoutError::NoLeafBlock))?,
162 )
163 .await?;
164
165 let bytes = Bytes::copy_from_slice(
167 &bytes[(self.byte_cursor - self.chunk_distance) as usize..],
168 );
169
170 Ok(bytes)
171 });
172
173 let get_raw_block_fn: Pin<
175 Box<dyn Future<Output = StoreResult<Bytes>> + Send + Sync + 'static>,
176 > = unsafe { std::mem::transmute(get_raw_block_fn) };
177
178 self.get_raw_block_fn = get_raw_block_fn;
180 }
181
182 fn read_update(&mut self, left_over: &[u8], consumed: u64) -> StoreResult<()> {
183 self.byte_cursor += consumed;
185
186 if !left_over.is_empty() {
188 let bytes = Bytes::copy_from_slice(left_over);
189 let get_raw_block_fn = Box::pin(async { Ok(bytes) });
190 self.get_raw_block_fn = get_raw_block_fn;
191 return Ok(());
192 }
193
194 if self.byte_cursor >= self.node.size as u64 {
196 let get_raw_block_fn = Box::pin(async { Ok(Bytes::new()) });
197 self.get_raw_block_fn = get_raw_block_fn;
198 return Ok(());
199 }
200
201 self.chunk_distance += self.node.children[self.chunk_index as usize].1 as u64;
203 self.chunk_index += 1;
204
205 self.fix_future();
207
208 Ok(())
209 }
210
211 fn seek_update(&mut self, byte_cursor: u64) -> StoreResult<()> {
212 self.byte_cursor = byte_cursor;
214
215 if self.byte_cursor >= self.node.size as u64 {
217 let get_raw_block_fn = Box::pin(async { Ok(Bytes::new()) });
218 self.get_raw_block_fn = get_raw_block_fn;
219 return Ok(());
220 }
221
222 loop {
225 match self.chunk_distance.cmp(&byte_cursor) {
226 Ordering::Less => {
227 if self.chunk_distance + self.node.children[self.chunk_index as usize].1 as u64
228 > byte_cursor
229 {
230 break;
231 }
232
233 self.chunk_distance += self.node.children[self.chunk_index as usize].1 as u64;
234 self.chunk_index += 1;
235
236 continue;
237 }
238 Ordering::Greater => {
239 self.chunk_distance -= self.node.children[self.chunk_index as usize].1 as u64;
240 self.chunk_index -= 1;
241
242 continue;
243 }
244 _ => break,
245 }
246 }
247
248 self.fix_future();
250
251 Ok(())
252 }
253}
254
255impl Layout for FlatLayout {
260 async fn organize<'a>(
261 &self,
262 mut stream: BoxStream<'a, StoreResult<Bytes>>,
263 store: impl IpldStore + Send + 'a,
264 ) -> StoreResult<BoxStream<'a, StoreResult<Cid>>> {
265 let s = try_stream! {
266 let mut children = Vec::new();
267 while let Some(Ok(chunk)) = stream.next().await {
268 let len = chunk.len();
269 let cid = store.put_raw_block(chunk).await?;
270 children.push((cid, len));
271 yield cid;
272 }
273
274 let node = MerkleNode::new(children);
275 let cid = store.put_node(&node).await?;
276
277 yield cid;
278 };
279
280 Ok(Box::pin(s))
281 }
282
283 async fn retrieve<'a>(
284 &self,
285 cid: &Cid,
286 store: impl IpldStore + Send + Sync + 'a,
287 ) -> StoreResult<Pin<Box<dyn AsyncRead + Send + Sync + 'a>>> {
288 let node = store.get_node(cid).await?;
289 let reader = FlatLayoutReader::new(node, store)?;
290 Ok(Box::pin(reader))
291 }
292}
293
294impl LayoutSeekable for FlatLayout {
295 async fn retrieve_seekable<'a>(
296 &self,
297 cid: &'a Cid,
298 store: impl IpldStore + Send + Sync + 'a,
299 ) -> StoreResult<Pin<Box<dyn SeekableReader + Send + 'a>>> {
300 let node = store.get_node(cid).await?;
301 let reader = FlatLayoutReader::new(node, store)?;
302 Ok(Box::pin(reader))
303 }
304}
305
306impl<S> AsyncRead for FlatLayoutReader<S>
307where
308 S: IpldStore + Sync,
309{
310 fn poll_read(
311 mut self: Pin<&mut Self>,
312 cx: &mut Context<'_>,
313 buf: &mut ReadBuf<'_>,
314 ) -> Poll<std::io::Result<()>> {
315 let bytes = ready!(self.get_raw_block_fn.as_mut().poll(cx))
317 .map_err(|e| Error::new(ErrorKind::Other, e))?;
318
319 let (taken, left_over) = if bytes.len() > buf.remaining() {
321 bytes.split_at(buf.remaining())
322 } else {
323 (&bytes[..], &[][..])
324 };
325
326 buf.put_slice(taken);
328
329 self.read_update(left_over, taken.len() as u64)
331 .map_err(|e| Error::new(ErrorKind::Other, e))?;
332
333 Poll::Ready(Ok(()))
334 }
335}
336
337impl<S> AsyncSeek for FlatLayoutReader<S>
338where
339 S: IpldStore + Sync,
340{
341 fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> std::io::Result<()> {
342 let byte_cursor = match position {
343 SeekFrom::Start(offset) => {
344 if offset >= self.node.size as u64 {
345 return Err(Error::new(
346 ErrorKind::InvalidInput,
347 "Seek from start position out of bounds",
348 ));
349 }
350
351 offset
352 }
353 SeekFrom::Current(offset) => {
354 let new_cursor = self.byte_cursor as i64 + offset;
355 if new_cursor < 0 || new_cursor >= self.node.size as i64 {
356 return Err(Error::new(
357 ErrorKind::InvalidInput,
358 "Seek from current position out of bounds",
359 ));
360 }
361
362 new_cursor as u64
363 }
364 SeekFrom::End(offset) => {
365 let new_cursor = self.node.size as i64 + offset;
366 if new_cursor < 0 || new_cursor >= self.node.size as i64 {
367 return Err(Error::new(
368 ErrorKind::InvalidInput,
369 "Seek from end position out of bounds",
370 ));
371 }
372
373 new_cursor as u64
374 }
375 };
376
377 self.seek_update(byte_cursor)
379 .map_err(|e| Error::new(ErrorKind::Other, e))?;
380
381 Ok(())
382 }
383
384 fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
385 Poll::Ready(Ok(self.byte_cursor))
386 }
387}
388
389#[cfg(test)]
394mod tests {
395 use futures::TryStreamExt;
396 use tokio::io::{AsyncReadExt, AsyncSeekExt};
397
398 use crate::MemoryStore;
399
400 use super::*;
401
402 #[tokio::test]
403 async fn test_flat_dag_layout_organize_and_retrieve() -> anyhow::Result<()> {
404 let store = MemoryStore::default();
405 let (data, _, chunk_stream) = fixtures::data_and_chunk_stream();
406
407 let layout = FlatLayout::default();
409 let cid_stream = layout.organize(chunk_stream, store.clone()).await?;
410
411 let cids = cid_stream.try_collect::<Vec<_>>().await?;
413 let cid = cids.last().unwrap();
414
415 let mut reader = layout.retrieve(cid, store.clone()).await?;
417 let mut bytes = Vec::new();
418 reader.read_to_end(&mut bytes).await?;
419
420 assert_eq!(bytes, data);
421
422 let mut reader = layout.retrieve(cid, store).await?;
424 let mut bytes: Vec<u8> = vec![];
425 loop {
426 let mut buf = vec![0; 5];
427 let filled = reader.read(&mut buf).await?;
428 if filled == 0 {
429 break;
430 }
431
432 bytes.extend(&buf[..filled]);
433 }
434
435 assert_eq!(bytes, data);
436
437 Ok(())
438 }
439
440 #[tokio::test]
441 async fn test_flat_dag_layout_seek() -> anyhow::Result<()> {
442 let store = MemoryStore::default();
443 let (_, chunks, chunk_stream) = fixtures::data_and_chunk_stream();
444
445 let layout = FlatLayout::default();
447 let cid_stream = layout.organize(chunk_stream, store.clone()).await?;
448
449 let cids = cid_stream.try_collect::<Vec<_>>().await?;
451 let cid = cids.last().unwrap();
452
453 let mut reader = layout.retrieve_seekable(cid, store).await?;
455
456 let mut buf = vec![0; 5];
458 reader.read(&mut buf).await?;
459
460 assert_eq!(buf, chunks[0]);
461
462 let mut buf = vec![0; 5];
464 reader.seek(SeekFrom::Current(5)).await?;
465 reader.read(&mut buf).await?;
466
467 assert_eq!(buf, chunks[2]);
468
469 let mut buf = vec![0; 3];
471 reader.seek(SeekFrom::Current(3)).await?;
472 reader.read(&mut buf).await?;
473
474 assert_eq!(buf, chunks[3][3..]);
475
476 let mut buf = vec![0; 5];
478 reader.seek(SeekFrom::End(-5)).await?;
479 reader.read(&mut buf).await?;
480
481 assert_eq!(buf, chunks[9]);
482
483 let mut buf = vec![0; 5];
485 reader.seek(SeekFrom::Start(5)).await?;
486 reader.read(&mut buf).await?;
487
488 assert_eq!(buf, chunks[1]);
489
490 let result = reader.seek(SeekFrom::End(5)).await;
492 assert!(result.is_err());
493
494 let result = reader.seek(SeekFrom::End(0)).await;
495 assert!(result.is_err());
496
497 let result = reader.seek(SeekFrom::Start(100)).await;
498 assert!(result.is_err());
499
500 let result = reader.seek(SeekFrom::Current(100)).await;
501 assert!(result.is_err());
502
503 let result = reader.seek(SeekFrom::Current(-100)).await;
505 assert!(result.is_err());
506
507 Ok(())
508 }
509}
510
511#[cfg(test)]
512mod fixtures {
513 use futures::{stream, Stream};
514
515 use super::*;
516
517 pub(super) fn data_and_chunk_stream() -> (
518 [u8; 56],
519 Vec<Bytes>,
520 Pin<Box<dyn Stream<Item = StoreResult<Bytes>> + Send + 'static>>,
521 ) {
522 let data = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit.".to_owned();
523
524 let chunks = vec![
525 Bytes::from("Lorem"),
526 Bytes::from(" ipsu"),
527 Bytes::from("m dol"),
528 Bytes::from("or sit"),
529 Bytes::from(" amet,"),
530 Bytes::from(" conse"),
531 Bytes::from("ctetur"),
532 Bytes::from(" adipi"),
533 Bytes::from("scing "),
534 Bytes::from("elit."),
535 ];
536
537 let chunks_result = chunks
538 .iter()
539 .cloned()
540 .map(|b| crate::Ok(b))
541 .collect::<Vec<_>>();
542
543 let chunk_stream = Box::pin(stream::iter(chunks_result));
544
545 (data, chunks, chunk_stream)
546 }
547}