1use std::{collections::BTreeMap, future::Future};
3
4use anyhow::Context;
5use bao_tree::blake3;
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8
9use crate::{
10 api::{blobs::AddBytesOptions, Store},
11 get::{fsm, Stats},
12 hashseq::HashSeq,
13 util::temp_tag::TempTag,
14 BlobFormat, Hash,
15};
16
17#[derive(Clone, Debug, PartialEq, Deserialize, Serialize, Default)]
21pub struct Collection {
22 blobs: Vec<(String, Hash)>,
24}
25
26impl std::ops::Index<usize> for Collection {
27 type Output = (String, Hash);
28
29 fn index(&self, index: usize) -> &Self::Output {
30 &self.blobs[index]
31 }
32}
33
34impl<K, V> Extend<(K, V)> for Collection
35where
36 K: Into<String>,
37 V: Into<Hash>,
38{
39 fn extend<T: IntoIterator<Item = (K, V)>>(&mut self, iter: T) {
40 self.blobs
41 .extend(iter.into_iter().map(|(k, v)| (k.into(), v.into())));
42 }
43}
44
45impl<K, V> FromIterator<(K, V)> for Collection
46where
47 K: Into<String>,
48 V: Into<Hash>,
49{
50 fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
51 let mut res = Self::default();
52 res.extend(iter);
53 res
54 }
55}
56
57impl IntoIterator for Collection {
58 type Item = (String, Hash);
59 type IntoIter = std::vec::IntoIter<Self::Item>;
60
61 fn into_iter(self) -> Self::IntoIter {
62 self.blobs.into_iter()
63 }
64}
65
66pub trait SimpleStore {
68 fn load(&self, hash: Hash) -> impl Future<Output = anyhow::Result<Bytes>> + Send + '_;
70}
71
72impl SimpleStore for crate::api::Store {
73 async fn load(&self, hash: Hash) -> anyhow::Result<Bytes> {
74 Ok(self.get_bytes(hash).await?)
75 }
76}
77
78#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
82struct CollectionMeta {
83 header: [u8; 13], names: Vec<String>,
85}
86
87impl Collection {
88 pub const HEADER: &'static [u8; 13] = b"CollectionV0.";
92
93 pub fn to_blobs(&self) -> impl DoubleEndedIterator<Item = Bytes> {
99 let meta = CollectionMeta {
100 header: *Self::HEADER,
101 names: self.names(),
102 };
103 let meta_bytes = postcard::to_stdvec(&meta).unwrap();
104 let meta_bytes_hash = blake3::hash(&meta_bytes).into();
105 let links = std::iter::once(meta_bytes_hash)
106 .chain(self.links())
107 .collect::<HashSeq>();
108 let links_bytes = links.into_inner();
109 [meta_bytes.into(), links_bytes].into_iter()
110 }
111
112 pub async fn read_fsm(
117 fsm_at_start_root: fsm::AtStartRoot,
118 ) -> anyhow::Result<(fsm::EndBlobNext, HashSeq, Collection)> {
119 let (next, links) = {
120 let curr = fsm_at_start_root.next();
121 let (curr, data) = curr.concatenate_into_vec().await?;
122 let links = HashSeq::new(data.into()).context("links could not be parsed")?;
123 (curr.next(), links)
124 };
125 let fsm::EndBlobNext::MoreChildren(at_meta) = next else {
126 anyhow::bail!("expected meta");
127 };
128 let (next, collection) = {
129 let mut children = links.clone();
130 let meta_link = children.pop_front().context("meta link not found")?;
131 let curr = at_meta.next(meta_link);
132 let (curr, names) = curr.concatenate_into_vec().await?;
133 let names = postcard::from_bytes::<CollectionMeta>(&names)?;
134 anyhow::ensure!(
135 names.header == *Self::HEADER,
136 "expected header {:?}, got {:?}",
137 Self::HEADER,
138 names.header
139 );
140 let collection = Collection::from_parts(children, names);
141 (curr.next(), collection)
142 };
143 Ok((next, links, collection))
144 }
145
146 pub async fn read_fsm_all(
150 fsm_at_start_root: crate::get::fsm::AtStartRoot,
151 ) -> anyhow::Result<(Collection, BTreeMap<u64, Bytes>, Stats)> {
152 let (next, links, collection) = Self::read_fsm(fsm_at_start_root).await?;
153 let mut res = BTreeMap::new();
154 let mut curr = next;
155 let end = loop {
156 match curr {
157 fsm::EndBlobNext::MoreChildren(more) => {
158 let child_offset = more.offset() - 1;
159 let Some(hash) = links.get(usize::try_from(child_offset)?) else {
160 break more.finish();
161 };
162 let header = more.next(hash);
163 let (next, blob) = header.concatenate_into_vec().await?;
164 res.insert(child_offset - 1, blob.into());
165 curr = next.next();
166 }
167 fsm::EndBlobNext::Closing(closing) => break closing,
168 }
169 };
170 let stats = end.next().await?;
171 Ok((collection, res, stats))
172 }
173
174 pub async fn load(root: Hash, store: &impl SimpleStore) -> anyhow::Result<Self> {
176 let hs = store.load(root).await?;
177 let hs = HashSeq::try_from(hs)?;
178 let meta_hash = hs.iter().next().context("empty hash seq")?;
179 let meta = store.load(meta_hash).await?;
180 let meta: CollectionMeta = postcard::from_bytes(&meta)?;
181 anyhow::ensure!(
182 meta.names.len() + 1 == hs.len(),
183 "names and links length mismatch"
184 );
185 Ok(Self::from_parts(hs.into_iter().skip(1), meta))
186 }
187
188 pub async fn store(self, db: &Store) -> anyhow::Result<TempTag> {
191 let (links, meta) = self.into_parts();
192 let meta_bytes = postcard::to_stdvec(&meta)?;
193 let meta_tag = db.add_bytes(meta_bytes).temp_tag().await?;
194 let links_bytes = std::iter::once(*meta_tag.hash())
195 .chain(links)
196 .collect::<HashSeq>();
197 let links_tag = db
198 .add_bytes_with_opts(AddBytesOptions {
199 data: links_bytes.into(),
200 format: BlobFormat::HashSeq,
201 })
202 .temp_tag()
203 .await?;
204 Ok(links_tag)
205 }
206
207 fn into_parts(self) -> (Vec<Hash>, CollectionMeta) {
209 let mut names = Vec::with_capacity(self.blobs.len());
210 let mut links = Vec::with_capacity(self.blobs.len());
211 for (name, hash) in self.blobs {
212 names.push(name);
213 links.push(hash);
214 }
215 let meta = CollectionMeta {
216 header: *Self::HEADER,
217 names,
218 };
219 (links, meta)
220 }
221
222 fn from_parts(links: impl IntoIterator<Item = Hash>, meta: CollectionMeta) -> Self {
224 meta.names.into_iter().zip(links).collect()
225 }
226
227 fn links(&self) -> impl Iterator<Item = Hash> + '_ {
229 self.blobs.iter().map(|(_name, hash)| *hash)
230 }
231
232 fn names(&self) -> Vec<String> {
234 self.blobs.iter().map(|(name, _)| name.clone()).collect()
235 }
236
237 pub fn iter(&self) -> impl Iterator<Item = &(String, Hash)> {
239 self.blobs.iter()
240 }
241
242 pub fn len(&self) -> usize {
244 self.blobs.len()
245 }
246
247 pub fn is_empty(&self) -> bool {
249 self.blobs.is_empty()
250 }
251
252 pub fn push(&mut self, name: String, hash: Hash) {
254 self.blobs.push((name, hash));
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261
262 #[test]
263 fn roundtrip_blob() {
264 let b = (
265 "test".to_string(),
266 blake3::Hash::from_hex(
267 "3aa61c409fd7717c9d9c639202af2fae470c0ef669be7ba2caea5779cb534e9d",
268 )
269 .unwrap()
270 .into(),
271 );
272
273 let mut buf = bytes::BytesMut::zeroed(1024);
274 postcard::to_slice(&b, &mut buf).unwrap();
275 let deserialize_b: (String, Hash) = postcard::from_bytes(&buf).unwrap();
276 assert_eq!(b, deserialize_b);
277 }
278
279 #[test]
280 fn roundtrip_collection_meta() {
281 let expected = CollectionMeta {
282 header: *Collection::HEADER,
283 names: vec!["test".to_string(), "a".to_string(), "b".to_string()],
284 };
285 let mut buf = bytes::BytesMut::zeroed(1024);
286 postcard::to_slice(&expected, &mut buf).unwrap();
287 let actual: CollectionMeta = postcard::from_bytes(&buf).unwrap();
288 assert_eq!(expected, actual);
289 }
290
291 #[tokio::test]
292 async fn collection_store_load() -> testresult::TestResult {
293 let collection = (0..3)
294 .map(|i| {
295 (
296 format!("blob{i}"),
297 crate::Hash::from(blake3::hash(&[i as u8])),
298 )
299 })
300 .collect::<Collection>();
301 let mut root = None;
302 let store = collection
303 .to_blobs()
304 .map(|data| {
305 let hash = crate::Hash::from(blake3::hash(&data));
306 root = Some(hash);
307 (hash, data)
308 })
309 .collect::<TestStore>();
310 let collection2 = Collection::load(root.unwrap(), &store).await?;
311 assert_eq!(collection, collection2);
312 Ok(())
313 }
314
315 struct TestStore(BTreeMap<Hash, Bytes>);
317
318 impl FromIterator<(Hash, Bytes)> for TestStore {
319 fn from_iter<T: IntoIterator<Item = (Hash, Bytes)>>(iter: T) -> Self {
320 Self(iter.into_iter().collect())
321 }
322 }
323
324 impl SimpleStore for TestStore {
325 async fn load(&self, hash: Hash) -> anyhow::Result<Bytes> {
326 self.0.get(&hash).cloned().context("not found")
327 }
328 }
329}