iroh_blobs/store/
readonly_mem.rs

1//! A readonly in memory database for iroh-blobs, usable for testing and sharing static data.
2//!
3//! Main entry point is [Store].
4use std::{
5    collections::{BTreeMap, BTreeSet, HashMap},
6    future::Future,
7    io,
8    path::PathBuf,
9    sync::Arc,
10};
11
12use bao_tree::{
13    blake3,
14    io::{outboard::PreOrderMemOutboard, sync::Outboard},
15};
16use bytes::Bytes;
17use futures_lite::Stream;
18use iroh_io::AsyncSliceReader;
19use tokio::io::AsyncWriteExt;
20
21use super::{BaoBatchWriter, BaoBlobSize, ConsistencyCheckProgress, DbIter, ExportProgressCb};
22use crate::{
23    store::{
24        EntryStatus, ExportMode, ImportMode, ImportProgress, Map, MapEntry, MapEntryMut,
25        ReadableStore,
26    },
27    util::{
28        progress::{BoxedProgressSender, IdGenerator, ProgressSender},
29        Tag,
30    },
31    BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
32};
33
34/// A readonly in memory database for iroh-blobs.
35///
36/// This is basically just a HashMap, so it does not allow for any modifications
37/// unless you have a mutable reference to it.
38///
39/// It is therefore useful mostly for testing and sharing static data.
40#[derive(Debug, Clone, Default)]
41pub struct Store(Arc<HashMap<Hash, (PreOrderMemOutboard<Bytes>, Bytes)>>);
42
43impl<K, V> FromIterator<(K, V)> for Store
44where
45    K: Into<String>,
46    V: AsRef<[u8]>,
47{
48    fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
49        let (db, _m) = Self::new(iter);
50        db
51    }
52}
53
54impl Store {
55    /// Create a new [Store] from a sequence of entries.
56    ///
57    /// Returns the database and a map of names to computed blake3 hashes.
58    /// In case of duplicate names, the last entry is used.
59    pub fn new(
60        entries: impl IntoIterator<Item = (impl Into<String>, impl AsRef<[u8]>)>,
61    ) -> (Self, BTreeMap<String, blake3::Hash>) {
62        let mut names = BTreeMap::new();
63        let mut res = HashMap::new();
64        for (name, data) in entries.into_iter() {
65            let name = name.into();
66            let data: &[u8] = data.as_ref();
67            // wrap into the right types
68            let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE).map_data(Bytes::from);
69            let hash = outboard.root();
70            // add the name, this assumes that names are unique
71            names.insert(name, hash);
72            let data = Bytes::from(data.to_vec());
73            let hash = Hash::from(hash);
74            res.insert(hash, (outboard, data));
75        }
76        (Self(Arc::new(res)), names)
77    }
78
79    /// Insert a new entry into the database, and return the hash of the entry.
80    ///
81    /// If the database was shared before, this will make a copy.
82    pub fn insert(&mut self, data: impl AsRef<[u8]>) -> Hash {
83        let inner = Arc::make_mut(&mut self.0);
84        let data: &[u8] = data.as_ref();
85        // wrap into the right types
86        let outboard = PreOrderMemOutboard::create(data, IROH_BLOCK_SIZE).map_data(Bytes::from);
87        let hash = outboard.root();
88        let data = Bytes::from(data.to_vec());
89        let hash = Hash::from(hash);
90        inner.insert(hash, (outboard, data));
91        hash
92    }
93
94    /// Insert multiple entries into the database, and return the hash of the last entry.
95    pub fn insert_many(
96        &mut self,
97        items: impl IntoIterator<Item = impl AsRef<[u8]>>,
98    ) -> Option<Hash> {
99        let mut hash = None;
100        for item in items.into_iter() {
101            hash = Some(self.insert(item));
102        }
103        hash
104    }
105
106    /// Get the bytes associated with a hash, if they exist.
107    pub fn get_content(&self, hash: &Hash) -> Option<Bytes> {
108        let entry = self.0.get(hash)?;
109        Some(entry.1.clone())
110    }
111
112    async fn export_impl(
113        &self,
114        hash: Hash,
115        target: PathBuf,
116        _mode: ExportMode,
117        progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
118    ) -> io::Result<()> {
119        tracing::trace!("exporting {} to {}", hash, target.display());
120
121        if !target.is_absolute() {
122            return Err(io::Error::new(
123                io::ErrorKind::InvalidInput,
124                "target path must be absolute",
125            ));
126        }
127        let parent = target.parent().ok_or_else(|| {
128            io::Error::new(
129                io::ErrorKind::InvalidInput,
130                "target path has no parent directory",
131            )
132        })?;
133        // create the directory in which the target file is
134        tokio::fs::create_dir_all(parent).await?;
135        let data = self
136            .get_content(&hash)
137            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?;
138
139        let mut offset = 0u64;
140        let mut file = tokio::fs::File::create(&target).await?;
141        for chunk in data.chunks(1024 * 1024) {
142            progress(offset)?;
143            file.write_all(chunk).await?;
144            offset += chunk.len() as u64;
145        }
146        file.sync_all().await?;
147        drop(file);
148        Ok(())
149    }
150}
151
152/// The [MapEntry] implementation for [Store].
153#[derive(Debug, Clone)]
154pub struct Entry {
155    outboard: PreOrderMemOutboard<Bytes>,
156    data: Bytes,
157}
158
159impl MapEntry for Entry {
160    fn hash(&self) -> Hash {
161        self.outboard.root().into()
162    }
163
164    fn size(&self) -> BaoBlobSize {
165        BaoBlobSize::Verified(self.data.len() as u64)
166    }
167
168    async fn outboard(&self) -> io::Result<impl bao_tree::io::fsm::Outboard> {
169        Ok(self.outboard.clone())
170    }
171
172    async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
173        Ok(self.data.clone())
174    }
175
176    fn is_complete(&self) -> bool {
177        true
178    }
179}
180
181impl Map for Store {
182    type Entry = Entry;
183
184    async fn get(&self, hash: &Hash) -> io::Result<Option<Self::Entry>> {
185        Ok(self.0.get(hash).map(|(o, d)| Entry {
186            outboard: o.clone(),
187            data: d.clone(),
188        }))
189    }
190}
191
192impl super::MapMut for Store {
193    type EntryMut = Entry;
194
195    async fn get_mut(&self, hash: &Hash) -> io::Result<Option<Self::EntryMut>> {
196        self.get(hash).await
197    }
198
199    async fn get_or_create(&self, _hash: Hash, _size: u64) -> io::Result<Entry> {
200        Err(io::Error::new(
201            io::ErrorKind::Other,
202            "cannot create temp entry in readonly database",
203        ))
204    }
205
206    fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus> {
207        Ok(match self.0.contains_key(hash) {
208            true => EntryStatus::Complete,
209            false => EntryStatus::NotFound,
210        })
211    }
212
213    async fn entry_status(&self, hash: &Hash) -> io::Result<EntryStatus> {
214        self.entry_status_sync(hash)
215    }
216
217    async fn insert_complete(&self, _entry: Entry) -> io::Result<()> {
218        // this is unreachable, since we cannot create partial entries
219        unreachable!()
220    }
221}
222
223impl ReadableStore for Store {
224    async fn blobs(&self) -> io::Result<DbIter<Hash>> {
225        Ok(Box::new(
226            self.0
227                .keys()
228                .copied()
229                .map(Ok)
230                .collect::<Vec<_>>()
231                .into_iter(),
232        ))
233    }
234
235    async fn tags(
236        &self,
237        _from: Option<Tag>,
238        _to: Option<Tag>,
239    ) -> io::Result<DbIter<(Tag, HashAndFormat)>> {
240        Ok(Box::new(std::iter::empty()))
241    }
242
243    fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static> {
244        Box::new(std::iter::empty())
245    }
246
247    async fn consistency_check(
248        &self,
249        _repair: bool,
250        _tx: BoxedProgressSender<ConsistencyCheckProgress>,
251    ) -> io::Result<()> {
252        Ok(())
253    }
254
255    async fn export(
256        &self,
257        hash: Hash,
258        target: PathBuf,
259        mode: ExportMode,
260        progress: ExportProgressCb,
261    ) -> io::Result<()> {
262        self.export_impl(hash, target, mode, progress).await
263    }
264
265    async fn partial_blobs(&self) -> io::Result<DbIter<Hash>> {
266        Ok(Box::new(std::iter::empty()))
267    }
268}
269
270impl MapEntryMut for Entry {
271    async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
272        enum Bar {}
273        impl BaoBatchWriter for Bar {
274            async fn write_batch(
275                &mut self,
276                _size: u64,
277                _batch: Vec<bao_tree::io::fsm::BaoContentItem>,
278            ) -> io::Result<()> {
279                unreachable!()
280            }
281
282            async fn sync(&mut self) -> io::Result<()> {
283                unreachable!()
284            }
285        }
286
287        #[allow(unreachable_code)]
288        Ok(unreachable!() as Bar)
289    }
290}
291
292impl super::Store for Store {
293    async fn import_file(
294        &self,
295        data: PathBuf,
296        mode: ImportMode,
297        format: BlobFormat,
298        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
299    ) -> io::Result<(TempTag, u64)> {
300        let _ = (data, mode, progress, format);
301        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
302    }
303
304    /// import a byte slice
305    async fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> io::Result<TempTag> {
306        let _ = (bytes, format);
307        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
308    }
309
310    async fn rename_tag(&self, _from: Tag, _to: Tag) -> io::Result<()> {
311        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
312    }
313
314    async fn import_stream(
315        &self,
316        data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send,
317        format: BlobFormat,
318        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
319    ) -> io::Result<(TempTag, u64)> {
320        let _ = (data, format, progress);
321        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
322    }
323
324    async fn set_tag(&self, _name: Tag, _hash: HashAndFormat) -> io::Result<()> {
325        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
326    }
327
328    async fn delete_tags(&self, _from: Option<Tag>, _to: Option<Tag>) -> io::Result<()> {
329        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
330    }
331
332    async fn create_tag(&self, _hash: HashAndFormat) -> io::Result<Tag> {
333        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
334    }
335
336    fn temp_tag(&self, inner: HashAndFormat) -> TempTag {
337        TempTag::new(inner, None)
338    }
339
340    async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
341    where
342        G: Fn() -> Gut,
343        Gut: Future<Output = BTreeSet<Hash>> + Send,
344    {
345        super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await
346    }
347
348    async fn delete(&self, _hashes: Vec<Hash>) -> io::Result<()> {
349        Err(io::Error::new(io::ErrorKind::Other, "not implemented"))
350    }
351
352    async fn shutdown(&self) {}
353
354    async fn sync(&self) -> io::Result<()> {
355        Ok(())
356    }
357}