1use std::{collections::HashMap, convert::Infallible};
2
3use futures::{AsyncReadExt as _, AsyncSeekExt as _};
4use ipld_core::cid::{multihash::Multihash, Cid, Version};
5use serde::{Deserialize, Serialize};
6use sha2::Digest;
7use tokio::io::{
8 AsyncRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _, AsyncWrite, AsyncWriteExt as _,
9 SeekFrom,
10};
11use tokio_util::compat::TokioAsyncReadCompatExt;
12use unsigned_varint::io::ReadError;
13
14use crate::blockstore::{self, AsyncBlockStoreRead, SHA2_256};
15
16use super::AsyncBlockStoreWrite;
17
18#[derive(Debug, Serialize, Deserialize)]
19pub struct V1Header {
20 pub version: u64,
21 pub roots: Vec<Cid>,
22}
23
24async fn read_cid<R: futures::AsyncRead + futures::AsyncSeek + Unpin>(
25 mut reader: R,
26) -> Result<Cid, Error> {
27 let version = unsigned_varint::aio::read_u64(&mut reader).await?;
28 let codec = unsigned_varint::aio::read_u64(&mut reader).await?;
29
30 if [version, codec] == [0x12, 0x20] {
32 let mut digest = [0u8; 32];
33 reader.read_exact(&mut digest).await?;
34 let mh = Multihash::wrap(version, &digest).expect("Digest is always 32 bytes.");
35 return Ok(Cid::new_v0(mh)?);
36 }
37
38 let version = Version::try_from(version)?;
39 match version {
40 Version::V0 => Err(Error::InvalidCidV0),
41 Version::V1 => {
42 let start = reader.stream_position().await?;
43 let _code = unsigned_varint::aio::read_u64(&mut reader).await?;
44 let size = unsigned_varint::aio::read_u64(&mut reader).await?;
45 let len = (reader.stream_position().await? - start) + size;
46
47 let mut mh_bytes = vec![0; len as usize];
48 reader.seek(SeekFrom::Start(start)).await?;
49 reader.read_exact(&mut mh_bytes).await?;
50
51 let mh = Multihash::from_bytes(&mh_bytes)?;
52 Ok(Cid::new(version, codec, mh)?)
53 }
54 }
55}
56
57#[derive(Debug)]
59pub struct CarStore<S: AsyncRead + AsyncSeek> {
60 storage: S,
61 header: V1Header,
62 index: HashMap<Cid, (u64, usize)>,
63}
64
65impl<R: AsyncRead + AsyncSeek + Unpin> CarStore<R> {
66 pub async fn open(mut storage: R) -> Result<Self, Error> {
68 let header_len = unsigned_varint::aio::read_usize((&mut storage).compat()).await?;
70 let mut header_bytes = vec![0; header_len];
71 storage.read_exact(&mut header_bytes).await?;
72 let header: V1Header = serde_ipld_dagcbor::from_slice(&header_bytes)?;
73
74 let mut buffer = Vec::new();
75
76 let mut index = HashMap::new();
78 loop {
79 match unsigned_varint::aio::read_u64((&mut storage).compat()).await {
80 Ok(data_len) => {
81 let start = storage.stream_position().await?;
82 let cid = read_cid((&mut storage).compat()).await?;
83 let offset = storage.stream_position().await?;
84 let len = data_len - (offset - start);
85 buffer.resize(len as usize, 0);
89 storage.read_exact(buffer.as_mut_slice()).await?;
90
91 let digest = match cid.hash().code() {
92 SHA2_256 => Some(sha2::Sha256::digest(buffer.as_slice())),
93 _ => None,
95 };
96
97 if let Some(digest) = digest {
98 let expected = Multihash::wrap(cid.hash().code(), digest.as_slice())
99 .map_err(Error::Multihash)?;
100 let expected = Cid::new_v1(cid.codec(), expected);
101
102 if expected != cid {
103 return Err(Error::InvalidHash);
104 }
105 }
106
107 index.insert(cid, (offset, len as usize));
108 }
109 Err(ReadError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
110 Err(e) => Err(e)?,
111 }
112 }
113
114 Ok(Self { storage, header, index })
115 }
116
117 pub fn roots(&self) -> impl Iterator<Item = Cid> {
118 self.header.roots.clone().into_iter()
119 }
120}
121
122impl<S: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin> CarStore<S> {
123 pub async fn create(storage: S) -> Result<Self, Error> {
124 Self::create_with_roots(storage, []).await
125 }
126
127 pub async fn create_with_roots(
128 mut storage: S,
129 roots: impl IntoIterator<Item = Cid>,
130 ) -> Result<Self, Error> {
131 let header = V1Header { version: 1, roots: roots.into_iter().collect::<Vec<_>>() };
132
133 let header_bytes = serde_ipld_dagcbor::to_vec(&header).unwrap();
134 let mut buf = unsigned_varint::encode::usize_buffer();
135 let buf = unsigned_varint::encode::usize(header_bytes.len(), &mut buf);
136 storage.write_all(buf).await?;
137 storage.write_all(&header_bytes).await?;
138
139 Ok(Self { storage, header, index: HashMap::new() })
140 }
141}
142
143impl<R: AsyncRead + AsyncSeek + Send + Unpin> AsyncBlockStoreRead for CarStore<R> {
144 async fn read_block_into(
145 &mut self,
146 cid: Cid,
147 contents: &mut Vec<u8>,
148 ) -> Result<(), blockstore::Error> {
149 contents.clear();
150
151 let (offset, len) = self.index.get(&cid).ok_or_else(|| blockstore::Error::CidNotFound)?;
152 contents.resize(*len, 0);
153
154 self.storage.seek(SeekFrom::Start(*offset)).await?;
155 self.storage.read_exact(contents).await?;
156
157 Ok(())
158 }
159}
160
161impl<R: AsyncRead + AsyncWrite + AsyncSeek + Send + Unpin> AsyncBlockStoreWrite for CarStore<R> {
162 async fn write_block(
163 &mut self,
164 codec: u64,
165 hash: u64,
166 contents: &[u8],
167 ) -> Result<Cid, blockstore::Error> {
168 let digest = match hash {
169 SHA2_256 => sha2::Sha256::digest(contents),
170 _ => return Err(blockstore::Error::UnsupportedHash(hash)),
171 };
172 let hash =
173 Multihash::wrap(hash, digest.as_slice()).expect("internal error encoding multihash");
174 let cid = Cid::new_v1(codec, hash);
175
176 if let std::collections::hash_map::Entry::Vacant(e) = self.index.entry(cid) {
178 let mut fc = vec![];
179 cid.write_bytes(&mut fc).expect("internal error writing CID");
180
181 let mut buf = unsigned_varint::encode::u64_buffer();
182 let buf = unsigned_varint::encode::u64((fc.len() + contents.len()) as u64, &mut buf);
183
184 self.storage.seek(SeekFrom::End(0)).await?;
185 self.storage.write_all(buf).await?;
186 self.storage.write_all(&fc).await?;
187 let offs = self.storage.stream_position().await?;
188 self.storage.write_all(contents).await?;
189
190 e.insert((offs, contents.len()));
192 }
193
194 Ok(cid)
195 }
196}
197
198#[derive(Debug, thiserror::Error)]
200pub enum Error {
201 #[error("invalid CID: {0}")]
202 Cid(#[from] ipld_core::cid::Error),
203 #[error("CID does not exist in CAR")]
204 CidNotFound,
205 #[error("file hash does not match computed hash for block")]
206 InvalidHash,
207 #[error("invalid explicit CID v0")]
208 InvalidCidV0,
209 #[error("invalid varint: {0}")]
210 InvalidVarint(#[from] unsigned_varint::io::ReadError),
211 #[error("IO error: {0}")]
212 Io(#[from] std::io::Error),
213 #[error("invalid Multihash: {0}")]
214 Multihash(#[from] ipld_core::cid::multihash::Error),
215 #[error("serde_ipld_dagcbor decoding error: {0}")]
216 Parse(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
217}
218
219#[cfg(test)]
220mod test {
221 use std::io::Cursor;
222
223 use crate::blockstore::{MemoryBlockStore, DAG_CBOR};
224
225 use super::*;
226
227 #[tokio::test]
228 async fn basic_rw() {
229 const STR: &[u8] = b"the quick brown fox jumps over the lazy dog";
230
231 let mut mem = Vec::new();
232 let mut bs = CarStore::create(Cursor::new(&mut mem)).await.unwrap();
233
234 let cid = bs.write_block(DAG_CBOR, SHA2_256, STR).await.unwrap();
235 assert_eq!(bs.read_block(cid).await.unwrap(), STR);
236
237 let mut bs = CarStore::open(Cursor::new(&mut mem)).await.unwrap();
238 assert_eq!(bs.read_block(cid).await.unwrap(), STR);
239 }
240
241 #[tokio::test]
242 async fn basic_rw_2blocks() {
243 const STR1: &[u8] = b"the quick brown fox jumps over the lazy dog";
244 const STR2: &[u8] = b"the lazy fox jumps over the quick brown dog";
245
246 let mut mem = Vec::new();
247 let mut bs = CarStore::create(Cursor::new(&mut mem)).await.unwrap();
248
249 let cid1 = bs.write_block(DAG_CBOR, SHA2_256, STR1).await.unwrap();
250 let cid2 = bs.write_block(DAG_CBOR, SHA2_256, STR2).await.unwrap();
251 assert_eq!(bs.read_block(cid1).await.unwrap(), STR1);
252 assert_eq!(bs.read_block(cid2).await.unwrap(), STR2);
253
254 let mut bs = CarStore::open(Cursor::new(&mut mem)).await.unwrap();
255 assert_eq!(bs.read_block(cid1).await.unwrap(), STR1);
256 assert_eq!(bs.read_block(cid2).await.unwrap(), STR2);
257 }
258
259 #[tokio::test]
260 async fn basic_root() {
261 const STR: &[u8] = b"the quick brown fox jumps over the lazy dog";
262
263 let mut mbs = MemoryBlockStore::new();
264
265 let cid = mbs.write_block(DAG_CBOR, SHA2_256, STR).await.unwrap();
266 assert_eq!(mbs.read_block(cid).await.unwrap(), STR);
267
268 let mut mem = Vec::new();
269 let mut bs = CarStore::create_with_roots(Cursor::new(&mut mem), [cid]).await.unwrap();
270 bs.write_block(DAG_CBOR, SHA2_256, STR).await.unwrap();
271
272 assert_eq!(bs.roots().next().unwrap(), cid);
273 assert_eq!(bs.read_block(cid).await.unwrap(), STR);
274
275 let mut bs = CarStore::open(Cursor::new(&mut mem)).await.unwrap();
276 assert_eq!(bs.roots().next().unwrap(), cid);
277 assert_eq!(bs.read_block(cid).await.unwrap(), STR);
278 }
279}