1use crate::db;
2use crate::db::compression;
3use crate::db::header::BlockHeader;
4use crate::errors::*;
5use async_trait::async_trait;
6use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, SeekFrom};
7
8#[async_trait]
9pub trait Consume {
10 type Item: Ord + 'static;
11
12 async fn consume<R: AsyncRead + AsyncSeek + Unpin + Send>(
13 mut reader: R,
14 header: &BlockHeader,
15 ) -> Result<Self::Item>;
16}
17
18pub struct ReadValue;
20
21#[async_trait]
22impl Consume for ReadValue {
23 type Item = db::Value;
24
25 async fn consume<R: AsyncRead + AsyncSeek + Unpin + Send>(
26 mut reader: R,
27 header: &BlockHeader,
28 ) -> Result<Self::Item> {
29 let mut compressed = vec![0u8; header.data_length as usize];
30 reader
31 .read_exact(&mut compressed)
32 .await
33 .context("Failed to read block data")?;
34
35 let data = compression::decompress(&compressed)
36 .await
37 .context("Failed to decompress block data")?;
38
39 Ok(data)
40 }
41}
42
43pub struct FastSkipValue;
45
46#[async_trait]
47impl Consume for FastSkipValue {
48 type Item = ();
49
50 async fn consume<R: AsyncRead + AsyncSeek + Unpin + Send>(
51 mut reader: R,
52 header: &BlockHeader,
53 ) -> Result<Self::Item> {
54 reader
55 .seek(SeekFrom::Current(header.data_length as i64))
56 .await
57 .context("Failed to seek over block data")?;
58 trace!("Seeked forward by {} bytes", header.data_length);
59 Ok(())
60 }
61}
62
63pub struct CheckedSkipValue;
65
66#[async_trait]
67impl Consume for CheckedSkipValue {
68 type Item = ();
69
70 async fn consume<R: AsyncRead + AsyncSeek + Unpin + Send>(
71 mut reader: R,
72 header: &BlockHeader,
73 ) -> Result<Self::Item> {
74 let pos = reader
76 .stream_position()
77 .await
78 .context("Failed to determine stream position")?;
79 let end = reader
80 .seek(SeekFrom::End(0))
81 .await
82 .context("Failed to seek to file end")?;
83
84 let remaining = end - pos;
85 if remaining < header.data_length {
86 bail!(
87 "Seek failed, missing {} bytes",
88 header.data_length - remaining
89 );
90 }
91
92 let new = reader
94 .seek(SeekFrom::Start(pos + header.data_length))
95 .await
96 .context("Failed to seek over block data")?;
97 trace!("Seeked forward by {} bytes", new - pos);
98 Ok(())
99 }
100}