iroh_bytes/store/
readonly_mem.rs

1//! A readonly in memory database for iroh-bytes, usable for testing and sharing static data.
2//!
3//! Main entry point is [Store].
4use std::{
5    collections::{BTreeMap, HashMap},
6    io,
7    path::PathBuf,
8    sync::Arc,
9};
10
11use crate::{
12    store::{
13        EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, MapEntryMut,
14        ReadableStore,
15    },
16    util::{
17        progress::{BoxedProgressSender, IdGenerator, ProgressSender},
18        Tag,
19    },
20    BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
21};
22use bao_tree::{
23    blake3,
24    io::{outboard::PreOrderMemOutboard, sync::Outboard},
25};
26use bytes::Bytes;
27use futures_lite::Stream;
28use iroh_io::AsyncSliceReader;
29use tokio::io::AsyncWriteExt;
30
31use super::{BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, DbIter, ExportProgressCb};
32
33/// A readonly in memory database for iroh-bytes.
34///
35/// This is basically just a HashMap, so it does not allow for any modifications
36/// unless you have a mutable reference to it.
37///
38/// It is therefore useful mostly for testing and sharing static data.
39#[derive(Debug, Clone, Default)]
40pub struct Store(Arc<HashMap<Hash, (PreOrderMemOutboard<Bytes>, Bytes)>>);
41
42impl<K, V> FromIterator<(K, V)> for Store
43where
44    K: Into<String>,
45    V: AsRef<[u8]>,
46{
47    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
48        let (db, _m) = Self::new(iter);
49        db
50    }
51}
52
53impl Store {
54    /// Create a new [Store] from a sequence of entries.
55    ///
56    /// Returns the database and a map of names to computed blake3 hashes.
57    /// In case of duplicate names, the last entry is used.
58    pub fn new(
59        entries: impl IntoIterator<Item = (impl Into<String>, impl AsRef<[u8]>)>,
60    ) -> (Self, BTreeMap<String, blake3::Hash>) {
61        let mut names = BTreeMap::new();
62        let mut res = HashMap::new();
63        for (name, data) in entries.into_iter() {
64            let name = name.into();
65            let data: &[u8] = data.as_ref();
66            // wrap into the right types
67            let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE).map_data(Bytes::from);
68            let hash = outboard.root();
69            // add the name, this assumes that names are unique
70            names.insert(name, hash);
71            let data = Bytes::from(data.to_vec());
72            let hash = Hash::from(hash);
73            res.insert(hash, (outboard, data));
74        }
75        (Self(Arc::new(res)), names)
76    }
77
78    /// Insert a new entry into the database, and return the hash of the entry.
79    ///
80    /// If the database was shared before, this will make a copy.
81    pub fn insert(&mut self, data: impl AsRef<[u8]>) -> Hash {
82        let inner = Arc::make_mut(&mut self.0);
83        let data: &[u8] = data.as_ref();
84        // wrap into the right types
85        let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE).map_data(Bytes::from);
86        let hash = outboard.root();
87        let data = Bytes::from(data.to_vec());
88        let hash = Hash::from(hash);
89        inner.insert(hash, (outboard, data));
90        hash
91    }
92
93    /// Insert multiple entries into the database, and return the hash of the last entry.
94    pub fn insert_many(
95        &mut self,
96        items: impl IntoIterator<Item = impl AsRef<[u8]>>,
97    ) -> Option<Hash> {
98        let mut hash = None;
99        for item in items.into_iter() {
100            hash = Some(self.insert(item));
101        }
102        hash
103    }
104
105    /// Get the bytes associated with a hash, if they exist.
106    pub fn get_content(&self, hash: &Hash) -> Option<Bytes> {
107        let entry = self.0.get(hash)?;
108        Some(entry.1.clone())
109    }
110
111    async fn export_impl(
112        &self,
113        hash: Hash,
114        target: PathBuf,
115        _mode: ExportMode,
116        progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
117    ) -> io::Result<()> {
118        tracing::trace!("exporting {} to {}", hash, target.display());
119
120        if !target.is_absolute() {
121            return Err(io::Error::new(
122                io::ErrorKind::InvalidInput,
123                "target path must be absolute",
124            ));
125        }
126        let parent = target.parent().ok_or_else(|| {
127            io::Error::new(
128                io::ErrorKind::InvalidInput,
129                "target path has no parent directory",
130            )
131        })?;
132        // create the directory in which the target file is
133        tokio::fs::create_dir_all(parent).await?;
134        let data = self
135            .get_content(&hash)
136            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?;
137
138        let mut offset = 0u64;
139        let mut file = tokio::fs::File::create(&target).await?;
140        for chunk in data.chunks(1024 * 1024) {
141            progress(offset)?;
142            file.write_all(chunk).await?;
143            offset += chunk.len() as u64;
144        }
145        file.sync_all().await?;
146        drop(file);
147        Ok(())
148    }
149}
150
151/// The [MapEntry] implementation for [Store].
152#[derive(Debug, Clone)]
153pub struct Entry {
154    outboard: PreOrderMemOutboard<Bytes>,
155    data: Bytes,
156}
157
158impl MapEntry for Entry {
159    fn hash(&self) -> Hash {
160        self.outboard.root().into()
161    }
162
163    fn size(&self) -> BaoBlobSize {
164        BaoBlobSize::Verified(self.data.len() as u64)
165    }
166
167    async fn outboard(&self) -> io::Result<impl bao_tree::io::fsm::Outboard> {
168        Ok(self.outboard.clone())
169    }
170
171    async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
172        Ok(self.data.clone())
173    }
174
175    fn is_complete(&self) -> bool {
176        true
177    }
178}
179
180impl Map for Store {
181    type Entry = Entry;
182
183    async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
184        Ok(self.0.get(hash).map(|(o, d)| Entry {
185            outboard: o.clone(),
186            data: d.clone(),
187        }))
188    }
189}
190
191impl super::MapMut for Store {
192    type EntryMut = Entry;
193
194    async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
195        self.get(hash).await
196    }
197
198    async fn get_or_create(&self, _hash: Hash, _size: u64) -> io::Result<Entry> {
199        Err(io::Error::new(
200            io::ErrorKind::Other,
201            "cannot create temp entry in readonly database",
202        ))
203    }
204
205    fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
206        Ok(match self.0.contains_key(hash) {
207            true => EntryStatus::Complete,
208            false => EntryStatus::NotFound,
209        })
210    }
211
212    async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
213        self.entry_status_sync(hash)
214    }
215
216    async fn insert_complete(&self, _entry: Entry) -> io::Result<()> {
217        // this is unreachable, since we cannot create partial entries
218        unreachable!()
219    }
220}
221
222impl ReadableStore for Store {
223    async fn blobs(&self) -> io::Result<DbIter<Hash>> {
224        Ok(Box::new(
225            self.0
226                .keys()
227                .copied()
228                .map(Ok)
229                .collect::<Vec<_>>()
230                .into_iter(),
231        ))
232    }
233
234    async fn tags(&self) -> io::Result<DbIter<(Tag, HashAndFormat)>> {
235        Ok(Box::new(std::iter::empty()))
236    }
237
238    fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
239        Box::new(std::iter::empty())
240    }
241
242    async fn consistency_check(
243        &self,
244        _repair: bool,
245        _tx: BoxedProgressSender<ConsistencyCheckProgress>,
246    ) -> io::Result<()> {
247        Ok(())
248    }
249
250    async fn export(
251        &self,
252        hash: Hash,
253        target: PathBuf,
254        mode: ExportMode,
255        progress: ExportProgressCb,
256    ) -> io::Result<()> {
257        self.export_impl(hash, target, mode, progress).await
258    }
259
260    async fn partial_blobs(&self) -> io::Result<DbIter<Hash>> {
261        Ok(Box::new(std::iter::empty()))
262    }
263}
264
265impl MapEntryMut for Entry {
266    async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
267        enum Bar {}
268        impl BaoBatchWriter for Bar {
269            async fn write_batch(
270                &mut self,
271                _size: u64,
272                _batch: Vec<bao_tree::io::fsm::BaoContentItem>,
273            ) -> io::Result<()> {
274                unreachable!()
275            }
276
277            async fn sync(&mut self) -> io::Result<()> {
278                unreachable!()
279            }
280        }
281
282        #[allow(unreachable_code)]
283        Ok(unreachable!() as Bar)
284    }
285}
286
287impl super::Store for Store {
288    async fn import_file(
289        &self,
290        data: PathBuf,
291        mode: ImportMode,
292        format: BlobFormat,
293        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
294    ) -> io::Result<(TempTag, u64)> {
295        let _ = (data, mode, progress, format);
296        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
297    }
298
299    /// import a byte slice
300    async fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> io::Result<TempTag> {
301        let _ = (bytes, format);
302        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
303    }
304
305    async fn import_stream(
306        &self,
307        data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send,
308        format: BlobFormat,
309        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
310    ) -> io::Result<(TempTag, u64)> {
311        let _ = (data, format, progress);
312        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
313    }
314
315    async fn set_tag(&self, _name: Tag, _hash: Option<HashAndFormat>) -> io::Result<()> {
316        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
317    }
318
319    async fn create_tag(&self, _hash: HashAndFormat) -> io::Result<Tag> {
320        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
321    }
322
323    fn temp_tag(&self, inner: HashAndFormat) -> TempTag {
324        TempTag::new(inner, None)
325    }
326
327    async fn gc_start(&self) -> io::Result<()> {
328        Ok(())
329    }
330
331    async fn delete(&self, _hashes: Vec<Hash>) -> io::Result<()> {
332        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
333    }
334
335    async fn shutdown(&self) {}
336}