atrium_repo/blockstore/
car.rs

1use std::{collections::HashMap, convert::Infallible};
2
3use futures::{AsyncReadExt as _, AsyncSeekExt as _};
4use ipld_core::cid::{multihash::Multihash, Cid, Version};
5use serde::{Deserialize, Serialize};
6use sha2::Digest;
7use tokio::io::{
8    AsyncRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _, AsyncWrite, AsyncWriteExt as _,
9    SeekFrom,
10};
11use tokio_util::compat::TokioAsyncReadCompatExt;
12use unsigned_varint::io::ReadError;
13
14use crate::blockstore::{self, AsyncBlockStoreRead, SHA2_256};
15
16use super::AsyncBlockStoreWrite;
17
18#[derive(Debug, Serialize, Deserialize)]
19pub struct V1Header {
20    pub version: u64,
21    pub roots: Vec<Cid>,
22}
23
24async fn read_cid<R: futures::AsyncRead + futures::AsyncSeek + Unpin>(
25    mut reader: R,
26) -> Result<Cid, Error> {
27    let version = unsigned_varint::aio::read_u64(&mut reader).await?;
28    let codec = unsigned_varint::aio::read_u64(&mut reader).await?;
29
30    // CIDv0 has the fixed `0x12 0x20` prefix
31    if [version, codec] == [0x12, 0x20] {
32        let mut digest = [0u8; 32];
33        reader.read_exact(&mut digest).await?;
34        let mh = Multihash::wrap(version, &digest).expect("Digest is always 32 bytes.");
35        return Ok(Cid::new_v0(mh)?);
36    }
37
38    let version = Version::try_from(version)?;
39    match version {
40        Version::V0 => Err(Error::InvalidCidV0),
41        Version::V1 => {
42            let start = reader.stream_position().await?;
43            let _code = unsigned_varint::aio::read_u64(&mut reader).await?;
44            let size = unsigned_varint::aio::read_u64(&mut reader).await?;
45            let len = (reader.stream_position().await? - start) + size;
46
47            let mut mh_bytes = vec![0; len as usize];
48            reader.seek(SeekFrom::Start(start)).await?;
49            reader.read_exact(&mut mh_bytes).await?;
50
51            let mh = Multihash::from_bytes(&mh_bytes)?;
52            Ok(Cid::new(version, codec, mh)?)
53        }
54    }
55}
56
57/// An indexed reader/writer for CAR files.
58#[derive(Debug)]
59pub struct CarStore<S: AsyncRead + AsyncSeek> {
60    storage: S,
61    header: V1Header,
62    index: HashMap<Cid, (u64, usize)>,
63}
64
65impl<R: AsyncRead + AsyncSeek + Unpin> CarStore<R> {
66    /// Open a pre-existing CAR file.
67    pub async fn open(mut storage: R) -> Result<Self, Error> {
68        // Read the header.
69        let header_len = unsigned_varint::aio::read_usize((&mut storage).compat()).await?;
70        let mut header_bytes = vec![0; header_len];
71        storage.read_exact(&mut header_bytes).await?;
72        let header: V1Header = serde_ipld_dagcbor::from_slice(&header_bytes)?;
73
74        let mut buffer = Vec::new();
75
76        // Build the index.
77        let mut index = HashMap::new();
78        loop {
79            match unsigned_varint::aio::read_u64((&mut storage).compat()).await {
80                Ok(data_len) => {
81                    let start = storage.stream_position().await?;
82                    let cid = read_cid((&mut storage).compat()).await?;
83                    let offset = storage.stream_position().await?;
84                    let len = data_len - (offset - start);
85                    // reader.seek(SeekFrom::Start(offset + len)).await?;
86
87                    // Validate this block's multihash.
88                    buffer.resize(len as usize, 0);
89                    storage.read_exact(buffer.as_mut_slice()).await?;
90
91                    let digest = match cid.hash().code() {
92                        SHA2_256 => Some(sha2::Sha256::digest(buffer.as_slice())),
93                        // FIXME: We should probably warn that we couldn't verify the block.
94                        _ => None,
95                    };
96
97                    if let Some(digest) = digest {
98                        let expected = Multihash::wrap(cid.hash().code(), digest.as_slice())
99                            .map_err(Error::Multihash)?;
100                        let expected = Cid::new_v1(cid.codec(), expected);
101
102                        if expected != cid {
103                            return Err(Error::InvalidHash);
104                        }
105                    }
106
107                    index.insert(cid, (offset, len as usize));
108                }
109                Err(ReadError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
110                Err(e) => Err(e)?,
111            }
112        }
113
114        Ok(Self { storage, header, index })
115    }
116
117    pub fn roots(&self) -> impl Iterator<Item = Cid> {
118        self.header.roots.clone().into_iter()
119    }
120}
121
122impl<S: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin> CarStore<S> {
123    pub async fn create(storage: S) -> Result<Self, Error> {
124        Self::create_with_roots(storage, []).await
125    }
126
127    pub async fn create_with_roots(
128        mut storage: S,
129        roots: impl IntoIterator<Item = Cid>,
130    ) -> Result<Self, Error> {
131        let header = V1Header { version: 1, roots: roots.into_iter().collect::<Vec<_>>() };
132
133        let header_bytes = serde_ipld_dagcbor::to_vec(&header).unwrap();
134        let mut buf = unsigned_varint::encode::usize_buffer();
135        let buf = unsigned_varint::encode::usize(header_bytes.len(), &mut buf);
136        storage.write_all(buf).await?;
137        storage.write_all(&header_bytes).await?;
138
139        Ok(Self { storage, header, index: HashMap::new() })
140    }
141}
142
143impl<R: AsyncRead + AsyncSeek + Send + Unpin> AsyncBlockStoreRead for CarStore<R> {
144    async fn read_block_into(
145        &mut self,
146        cid: Cid,
147        contents: &mut Vec<u8>,
148    ) -> Result<(), blockstore::Error> {
149        contents.clear();
150
151        let (offset, len) = self.index.get(&cid).ok_or_else(|| blockstore::Error::CidNotFound)?;
152        contents.resize(*len, 0);
153
154        self.storage.seek(SeekFrom::Start(*offset)).await?;
155        self.storage.read_exact(contents).await?;
156
157        Ok(())
158    }
159}
160
161impl<R: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin> AsyncBlockStoreWrite for CarStore<R> {
162    async fn write_block(
163        &mut self,
164        codec: u64,
165        hash: u64,
166        contents: &[u8],
167    ) -> Result<Cid, blockstore::Error> {
168        let digest = match hash {
169            SHA2_256 => sha2::Sha256::digest(contents),
170            _ => return Err(blockstore::Error::UnsupportedHash(hash)),
171        };
172        let hash =
173            Multihash::wrap(hash, digest.as_slice()).expect("internal error encoding multihash");
174        let cid = Cid::new_v1(codec, hash);
175
176        // Only write the record if the CAR file does not already contain it.
177        if let std::collections::hash_map::Entry::Vacant(e) = self.index.entry(cid) {
178            let mut fc = vec![];
179            cid.write_bytes(&mut fc).expect("internal error writing CID");
180
181            let mut buf = unsigned_varint::encode::u64_buffer();
182            let buf = unsigned_varint::encode::u64((fc.len() + contents.len()) as u64, &mut buf);
183
184            self.storage.seek(SeekFrom::End(0)).await?;
185            self.storage.write_all(buf).await?;
186            self.storage.write_all(&fc).await?;
187            let offs = self.storage.stream_position().await?;
188            self.storage.write_all(contents).await?;
189
190            // Update the index with the new block.
191            e.insert((offs, contents.len()));
192        }
193
194        Ok(cid)
195    }
196}
197
198/// Errors that can occur while interacting with a CAR.
199#[derive(Debug, thiserror::Error)]
200pub enum Error {
201    #[error("invalid CID: {0}")]
202    Cid(#[from] ipld_core::cid::Error),
203    #[error("CID does not exist in CAR")]
204    CidNotFound,
205    #[error("file hash does not match computed hash for block")]
206    InvalidHash,
207    #[error("invalid explicit CID v0")]
208    InvalidCidV0,
209    #[error("invalid varint: {0}")]
210    InvalidVarint(#[from] unsigned_varint::io::ReadError),
211    #[error("IO error: {0}")]
212    Io(#[from] std::io::Error),
213    #[error("invalid Multihash: {0}")]
214    Multihash(#[from] ipld_core::cid::multihash::Error),
215    #[error("serde_ipld_dagcbor decoding error: {0}")]
216    Parse(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
217}
218
219#[cfg(test)]
220mod test {
221    use std::io::Cursor;
222
223    use crate::blockstore::{MemoryBlockStore, DAG_CBOR};
224
225    use super::*;
226
227    #[tokio::test]
228    async fn basic_rw() {
229        const STR: &[u8] = b"the quick brown fox jumps over the lazy dog";
230
231        let mut mem = Vec::new();
232        let mut bs = CarStore::create(Cursor::new(&mut mem)).await.unwrap();
233
234        let cid = bs.write_block(DAG_CBOR, SHA2_256, STR).await.unwrap();
235        assert_eq!(bs.read_block(cid).await.unwrap(), STR);
236
237        let mut bs = CarStore::open(Cursor::new(&mut mem)).await.unwrap();
238        assert_eq!(bs.read_block(cid).await.unwrap(), STR);
239    }
240
241    #[tokio::test]
242    async fn basic_rw_2blocks() {
243        const STR1: &[u8] = b"the quick brown fox jumps over the lazy dog";
244        const STR2: &[u8] = b"the lazy fox jumps over the quick brown dog";
245
246        let mut mem = Vec::new();
247        let mut bs = CarStore::create(Cursor::new(&mut mem)).await.unwrap();
248
249        let cid1 = bs.write_block(DAG_CBOR, SHA2_256, STR1).await.unwrap();
250        let cid2 = bs.write_block(DAG_CBOR, SHA2_256, STR2).await.unwrap();
251        assert_eq!(bs.read_block(cid1).await.unwrap(), STR1);
252        assert_eq!(bs.read_block(cid2).await.unwrap(), STR2);
253
254        let mut bs = CarStore::open(Cursor::new(&mut mem)).await.unwrap();
255        assert_eq!(bs.read_block(cid1).await.unwrap(), STR1);
256        assert_eq!(bs.read_block(cid2).await.unwrap(), STR2);
257    }
258
259    #[tokio::test]
260    async fn basic_root() {
261        const STR: &[u8] = b"the quick brown fox jumps over the lazy dog";
262
263        let mut mbs = MemoryBlockStore::new();
264
265        let cid = mbs.write_block(DAG_CBOR, SHA2_256, STR).await.unwrap();
266        assert_eq!(mbs.read_block(cid).await.unwrap(), STR);
267
268        let mut mem = Vec::new();
269        let mut bs = CarStore::create_with_roots(Cursor::new(&mut mem), [cid]).await.unwrap();
270        bs.write_block(DAG_CBOR, SHA2_256, STR).await.unwrap();
271
272        assert_eq!(bs.roots().next().unwrap(), cid);
273        assert_eq!(bs.read_block(cid).await.unwrap(), STR);
274
275        let mut bs = CarStore::open(Cursor::new(&mut mem)).await.unwrap();
276        assert_eq!(bs.roots().next().unwrap(), cid);
277        assert_eq!(bs.read_block(cid).await.unwrap(), STR);
278    }
279}