apt_swarm/db/
consume.rs

1use crate::db;
2use crate::db::compression;
3use crate::db::header::BlockHeader;
4use crate::errors::*;
5use async_trait::async_trait;
6use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, SeekFrom};
7
8#[async_trait]
9pub trait Consume {
10    type Item: Ord + 'static;
11
12    async fn consume<R: AsyncRead + AsyncSeek + Unpin + Send>(
13        mut reader: R,
14        header: &BlockHeader,
15    ) -> Result<Self::Item>;
16}
17
18/// Read and return data
19pub struct ReadValue;
20
21#[async_trait]
22impl Consume for ReadValue {
23    type Item = db::Value;
24
25    async fn consume<R: AsyncRead + AsyncSeek + Unpin + Send>(
26        mut reader: R,
27        header: &BlockHeader,
28    ) -> Result<Self::Item> {
29        let mut compressed = vec![0u8; header.data_length as usize];
30        reader
31            .read_exact(&mut compressed)
32            .await
33            .context("Failed to read block data")?;
34
35        let data = compression::decompress(&compressed)
36            .await
37            .context("Failed to decompress block data")?;
38
39        Ok(data)
40    }
41}
42
43/// Skip over data immediately, possibly beyond EOF
44pub struct FastSkipValue;
45
46#[async_trait]
47impl Consume for FastSkipValue {
48    type Item = ();
49
50    async fn consume<R: AsyncRead + AsyncSeek + Unpin + Send>(
51        mut reader: R,
52        header: &BlockHeader,
53    ) -> Result<Self::Item> {
54        reader
55            .seek(SeekFrom::Current(header.data_length as i64))
56            .await
57            .context("Failed to seek over block data")?;
58        trace!("Seeked forward by {} bytes", header.data_length);
59        Ok(())
60    }
61}
62
63/// Skip over data, verify enough data is present before seek
64pub struct CheckedSkipValue;
65
66#[async_trait]
67impl Consume for CheckedSkipValue {
68    type Item = ();
69
70    async fn consume<R: AsyncRead + AsyncSeek + Unpin + Send>(
71        mut reader: R,
72        header: &BlockHeader,
73    ) -> Result<Self::Item> {
74        // determine if enough data is available
75        let pos = reader
76            .stream_position()
77            .await
78            .context("Failed to determine stream position")?;
79        let end = reader
80            .seek(SeekFrom::End(0))
81            .await
82            .context("Failed to seek to file end")?;
83
84        let remaining = end - pos;
85        if remaining < header.data_length {
86            bail!(
87                "Seek failed, missing {} bytes",
88                header.data_length - remaining
89            );
90        }
91
92        // we have enough data, perform seek
93        let new = reader
94            .seek(SeekFrom::Start(pos + header.data_length))
95            .await
96            .context("Failed to seek over block data")?;
97        trace!("Seeked forward by {} bytes", new - pos);
98        Ok(())
99    }
100}