iroh_blobs/store/
mem.rs

1//! A full in memory database for iroh-blobs
2//!
3//! Main entry point is [Store].
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    future::Future,
7    io,
8    path::PathBuf,
9    sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
10    time::SystemTime,
11};
12
13use bao_tree::{
14    io::{fsm::Outboard, outboard::PreOrderOutboard, sync::WriteAt},
15    BaoTree,
16};
17use bytes::{Bytes, BytesMut};
18use futures_lite::{Stream, StreamExt};
19use iroh_io::AsyncSliceReader;
20
21use super::{
22    temp_name, BaoBatchWriter, ConsistencyCheckProgress, ExportMode, ExportProgressCb, ImportMode,
23    ImportProgress, Map, TempCounterMap,
24};
25use crate::{
26    store::{
27        mutable_mem_storage::MutableMemStorage, BaoBlobSize, MapEntry, MapEntryMut, ReadableStore,
28    },
29    util::{
30        progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender},
31        TagCounter, TagDrop,
32    },
33    BlobFormat, Hash, HashAndFormat, Tag, TempTag, IROH_BLOCK_SIZE,
34};
35
36/// A fully featured in memory database for iroh-blobs, including support for
37/// partial blobs.
38#[derive(Debug, Clone, Default)]
39pub struct Store {
40    inner: Arc<StoreInner>,
41}
42
43#[derive(Debug, Default)]
44struct StoreInner(RwLock<StateInner>);
45
46impl TagDrop for StoreInner {
47    fn on_drop(&self, inner: &HashAndFormat) {
48        tracing::trace!("temp tag drop: {:?}", inner);
49        let mut state = self.0.write().unwrap();
50        state.temp.dec(inner);
51    }
52}
53
54impl TagCounter for StoreInner {
55    fn on_create(&self, inner: &HashAndFormat) {
56        tracing::trace!("temp tagging: {:?}", inner);
57        let mut state = self.0.write().unwrap();
58        state.temp.inc(inner);
59    }
60}
61
62impl Store {
63    /// Create a new in memory store
64    pub fn new() -> Self {
65        Self::default()
66    }
67
68    /// Take a write lock on the store
69    fn write_lock(&self) -> RwLockWriteGuard<'_, StateInner> {
70        self.inner.0.write().unwrap()
71    }
72
73    /// Take a read lock on the store
74    fn read_lock(&self) -> RwLockReadGuard<'_, StateInner> {
75        self.inner.0.read().unwrap()
76    }
77
78    fn import_bytes_sync(
79        &self,
80        id: u64,
81        bytes: Bytes,
82        format: BlobFormat,
83        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
84    ) -> io::Result<TempTag> {
85        progress.blocking_send(ImportProgress::OutboardProgress { id, offset: 0 })?;
86        let progress2 = progress.clone();
87        let cb = move |offset| {
88            progress2
89                .try_send(ImportProgress::OutboardProgress { id, offset })
90                .ok();
91        };
92        let (storage, hash) = MutableMemStorage::complete(bytes, cb);
93        progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
94        use super::Store;
95        let tag = self.temp_tag(HashAndFormat { hash, format });
96        let entry = Entry {
97            inner: Arc::new(EntryInner {
98                hash,
99                data: RwLock::new(storage),
100            }),
101            complete: true,
102        };
103        self.write_lock().entries.insert(hash, entry);
104        Ok(tag)
105    }
106
107    fn export_sync(
108        &self,
109        hash: Hash,
110        target: PathBuf,
111        _mode: ExportMode,
112        progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
113    ) -> io::Result<()> {
114        tracing::trace!("exporting {} to {}", hash, target.display());
115
116        if !target.is_absolute() {
117            return Err(io::Error::new(
118                io::ErrorKind::InvalidInput,
119                "target path must be absolute",
120            ));
121        }
122        let parent = target.parent().ok_or_else(|| {
123            io::Error::new(
124                io::ErrorKind::InvalidInput,
125                "target path has no parent directory",
126            )
127        })?;
128        // create the directory in which the target file is
129        std::fs::create_dir_all(parent)?;
130        let state = self.read_lock();
131        let entry = state
132            .entries
133            .get(&hash)
134            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?;
135        let reader = &entry.inner.data;
136        let size = reader.read().unwrap().current_size();
137        let mut file = std::fs::File::create(target)?;
138        for offset in (0..size).step_by(1024 * 1024) {
139            let bytes = reader.read().unwrap().read_data_at(offset, 1024 * 1024);
140            file.write_at(offset, &bytes)?;
141            progress(offset)?;
142        }
143        std::io::Write::flush(&mut file)?;
144        drop(file);
145        Ok(())
146    }
147}
148
149impl super::Store for Store {
150    async fn import_file(
151        &self,
152        path: std::path::PathBuf,
153        _mode: ImportMode,
154        format: BlobFormat,
155        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
156    ) -> io::Result<(TempTag, u64)> {
157        let this = self.clone();
158        tokio::task::spawn_blocking(move || {
159            let id = progress.new_id();
160            progress.blocking_send(ImportProgress::Found {
161                id,
162                name: path.to_string_lossy().to_string(),
163            })?;
164            progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
165            // todo: provide progress for reading into mem
166            let bytes: Bytes = std::fs::read(path)?.into();
167            let size = bytes.len() as u64;
168            progress.blocking_send(ImportProgress::Size { id, size })?;
169            let tag = this.import_bytes_sync(id, bytes, format, progress)?;
170            Ok((tag, size))
171        })
172        .await?
173    }
174
175    async fn import_stream(
176        &self,
177        mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
178        format: BlobFormat,
179        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
180    ) -> io::Result<(TempTag, u64)> {
181        let this = self.clone();
182        let id = progress.new_id();
183        let name = temp_name();
184        progress.send(ImportProgress::Found { id, name }).await?;
185        let mut bytes = BytesMut::new();
186        while let Some(chunk) = data.next().await {
187            bytes.extend_from_slice(&chunk?);
188            progress
189                .try_send(ImportProgress::CopyProgress {
190                    id,
191                    offset: bytes.len() as u64,
192                })
193                .ok();
194        }
195        let bytes = bytes.freeze();
196        let size = bytes.len() as u64;
197        progress.blocking_send(ImportProgress::Size { id, size })?;
198        let tag = this.import_bytes_sync(id, bytes, format, progress)?;
199        Ok((tag, size))
200    }
201
202    async fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> io::Result<TempTag> {
203        let this = self.clone();
204        tokio::task::spawn_blocking(move || {
205            this.import_bytes_sync(0, bytes, format, IgnoreProgressSender::default())
206        })
207        .await?
208    }
209
210    async fn set_tag(&self, name: Tag, value: Option<HashAndFormat>) -> io::Result<()> {
211        let mut state = self.write_lock();
212        if let Some(value) = value {
213            state.tags.insert(name, value);
214        } else {
215            state.tags.remove(&name);
216        }
217        Ok(())
218    }
219
220    async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
221        let mut state = self.write_lock();
222        let tag = Tag::auto(SystemTime::now(), |x| state.tags.contains_key(x));
223        state.tags.insert(tag.clone(), hash);
224        Ok(tag)
225    }
226
227    fn temp_tag(&self, tag: HashAndFormat) -> TempTag {
228        self.inner.temp_tag(tag)
229    }
230
231    async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
232    where
233        G: Fn() -> Gut,
234        Gut: Future<Output = BTreeSet<Hash>> + Send,
235    {
236        super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await
237    }
238
239    async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
240        let mut state = self.write_lock();
241        for hash in hashes {
242            if !state.temp.contains(&hash) {
243                state.entries.remove(&hash);
244            }
245        }
246        Ok(())
247    }
248
249    async fn shutdown(&self) {}
250
251    async fn sync(&self) -> io::Result<()> {
252        Ok(())
253    }
254}
255
256#[derive(Debug, Default)]
257struct StateInner {
258    entries: BTreeMap<Hash, Entry>,
259    tags: BTreeMap<Tag, HashAndFormat>,
260    temp: TempCounterMap,
261}
262
263/// An in memory entry
264#[derive(Debug, Clone)]
265pub struct Entry {
266    inner: Arc<EntryInner>,
267    complete: bool,
268}
269
270#[derive(Debug)]
271struct EntryInner {
272    hash: Hash,
273    data: RwLock<MutableMemStorage>,
274}
275
276impl MapEntry for Entry {
277    fn hash(&self) -> Hash {
278        self.inner.hash
279    }
280
281    fn size(&self) -> BaoBlobSize {
282        let size = self.inner.data.read().unwrap().current_size();
283        BaoBlobSize::new(size, self.complete)
284    }
285
286    fn is_complete(&self) -> bool {
287        self.complete
288    }
289
290    async fn outboard(&self) -> io::Result<impl Outboard> {
291        let size = self.inner.data.read().unwrap().current_size();
292        Ok(PreOrderOutboard {
293            root: self.hash().into(),
294            tree: BaoTree::new(size, IROH_BLOCK_SIZE),
295            data: OutboardReader(self.inner.clone()),
296        })
297    }
298
299    async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
300        Ok(DataReader(self.inner.clone()))
301    }
302}
303
304impl MapEntryMut for Entry {
305    async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
306        Ok(BatchWriter(self.inner.clone()))
307    }
308}
309
310struct DataReader(Arc<EntryInner>);
311
312impl AsyncSliceReader for DataReader {
313    async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
314        Ok(self.0.data.read().unwrap().read_data_at(offset, len))
315    }
316
317    async fn size(&mut self) -> std::io::Result<u64> {
318        Ok(self.0.data.read().unwrap().data_len())
319    }
320}
321
322struct OutboardReader(Arc<EntryInner>);
323
324impl AsyncSliceReader for OutboardReader {
325    async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
326        Ok(self.0.data.read().unwrap().read_outboard_at(offset, len))
327    }
328
329    async fn size(&mut self) -> std::io::Result<u64> {
330        Ok(self.0.data.read().unwrap().outboard_len())
331    }
332}
333
334struct BatchWriter(Arc<EntryInner>);
335
336impl super::BaoBatchWriter for BatchWriter {
337    async fn write_batch(
338        &mut self,
339        size: u64,
340        batch: Vec<bao_tree::io::fsm::BaoContentItem>,
341    ) -> io::Result<()> {
342        self.0.data.write().unwrap().write_batch(size, &batch)
343    }
344
345    async fn sync(&mut self) -> io::Result<()> {
346        Ok(())
347    }
348}
349
350impl super::Map for Store {
351    type Entry = Entry;
352
353    async fn get(&self, hash: &Hash) -> std::io::Result<Option<Self::Entry>> {
354        Ok(self.inner.0.read().unwrap().entries.get(hash).cloned())
355    }
356}
357
358impl super::MapMut for Store {
359    type EntryMut = Entry;
360
361    async fn get_mut(&self, hash: &Hash) -> std::io::Result<Option<Self::EntryMut>> {
362        self.get(hash).await
363    }
364
365    async fn get_or_create(&self, hash: Hash, _size: u64) -> std::io::Result<Entry> {
366        let entry = Entry {
367            inner: Arc::new(EntryInner {
368                hash,
369                data: RwLock::new(MutableMemStorage::default()),
370            }),
371            complete: false,
372        };
373        Ok(entry)
374    }
375
376    async fn entry_status(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
377        self.entry_status_sync(hash)
378    }
379
380    fn entry_status_sync(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
381        Ok(match self.inner.0.read().unwrap().entries.get(hash) {
382            Some(entry) => {
383                if entry.complete {
384                    crate::store::EntryStatus::Complete
385                } else {
386                    crate::store::EntryStatus::Partial
387                }
388            }
389            None => crate::store::EntryStatus::NotFound,
390        })
391    }
392
393    async fn insert_complete(&self, mut entry: Entry) -> std::io::Result<()> {
394        let hash = entry.hash();
395        let mut inner = self.inner.0.write().unwrap();
396        let complete = inner
397            .entries
398            .get(&hash)
399            .map(|x| x.complete)
400            .unwrap_or_default();
401        if !complete {
402            entry.complete = true;
403            inner.entries.insert(hash, entry);
404        }
405        Ok(())
406    }
407}
408
409impl ReadableStore for Store {
410    async fn blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
411        let entries = self.read_lock().entries.clone();
412        Ok(Box::new(
413            entries
414                .into_values()
415                .filter(|x| x.complete)
416                .map(|x| Ok(x.hash())),
417        ))
418    }
419
420    async fn partial_blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
421        let entries = self.read_lock().entries.clone();
422        Ok(Box::new(
423            entries
424                .into_values()
425                .filter(|x| !x.complete)
426                .map(|x| Ok(x.hash())),
427        ))
428    }
429
430    async fn tags(&self) -> io::Result<crate::store::DbIter<(crate::Tag, crate::HashAndFormat)>> {
431        #[allow(clippy::mutable_key_type)]
432        let tags = self.read_lock().tags.clone();
433        Ok(Box::new(tags.into_iter().map(Ok)))
434    }
435
436    fn temp_tags(&self) -> Box<dyn Iterator<Item = crate::HashAndFormat> + Send + Sync + 'static> {
437        let tags = self.read_lock().temp.keys();
438        Box::new(tags)
439    }
440
441    async fn consistency_check(
442        &self,
443        _repair: bool,
444        _tx: BoxedProgressSender<ConsistencyCheckProgress>,
445    ) -> io::Result<()> {
446        todo!()
447    }
448
449    async fn export(
450        &self,
451        hash: Hash,
452        target: std::path::PathBuf,
453        mode: crate::store::ExportMode,
454        progress: ExportProgressCb,
455    ) -> io::Result<()> {
456        let this = self.clone();
457        tokio::task::spawn_blocking(move || this.export_sync(hash, target, mode, progress)).await?
458    }
459}