iroh_bytes/store/
mem.rs

1//! A full in memory database for iroh-bytes
2//!
3//! Main entry point is [Store].
4use bao_tree::{
5    io::{fsm::Outboard, outboard::PreOrderOutboard, sync::WriteAt},
6    BaoTree,
7};
8use bytes::{Bytes, BytesMut};
9use futures_lite::{Stream, StreamExt};
10use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
11use iroh_io::AsyncSliceReader;
12use std::{
13    collections::BTreeMap,
14    io,
15    path::PathBuf,
16    sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
17    time::SystemTime,
18};
19
20use crate::{
21    store::{
22        mutable_mem_storage::MutableMemStorage, BaoBlobSize, MapEntry, MapEntryMut, ReadableStore,
23    },
24    util::{
25        progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender},
26        LivenessTracker,
27    },
28    Tag, TempTag, IROH_BLOCK_SIZE,
29};
30
31use super::{
32    temp_name, BaoBatchWriter, ConsistencyCheckProgress, ExportMode, ExportProgressCb, ImportMode,
33    ImportProgress, Map, TempCounterMap,
34};
35
36/// A fully featured in memory database for iroh-bytes, including support for
37/// partial blobs.
38#[derive(Debug, Clone, Default)]
39pub struct Store {
40    inner: Arc<StoreInner>,
41}
42
43#[derive(Debug, Default)]
44struct StoreInner(RwLock<StateInner>);
45
46impl LivenessTracker for StoreInner {
47    fn on_clone(&self, inner: &HashAndFormat) {
48        tracing::trace!("temp tagging: {:?}", inner);
49        let mut state = self.0.write().unwrap();
50        state.temp.inc(inner);
51    }
52
53    fn on_drop(&self, inner: &HashAndFormat) {
54        tracing::trace!("temp tag drop: {:?}", inner);
55        let mut state = self.0.write().unwrap();
56        state.temp.dec(inner);
57    }
58}
59
60impl Store {
61    /// Create a new in memory store
62    pub fn new() -> Self {
63        Self::default()
64    }
65
66    /// Take a write lock on the store
67    fn write_lock(&self) -> RwLockWriteGuard<'_, StateInner> {
68        self.inner.0.write().unwrap()
69    }
70
71    /// Take a read lock on the store
72    fn read_lock(&self) -> RwLockReadGuard<'_, StateInner> {
73        self.inner.0.read().unwrap()
74    }
75
76    fn import_bytes_sync(
77        &self,
78        id: u64,
79        bytes: Bytes,
80        format: BlobFormat,
81        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
82    ) -> io::Result<TempTag> {
83        progress.blocking_send(ImportProgress::OutboardProgress { id, offset: 0 })?;
84        let (storage, hash) = MutableMemStorage::complete(bytes);
85        progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
86        use super::Store;
87        let tag = self.temp_tag(HashAndFormat { hash, format });
88        let entry = Entry {
89            inner: Arc::new(EntryInner {
90                hash,
91                data: RwLock::new(storage),
92            }),
93            complete: true,
94        };
95        self.write_lock().entries.insert(hash, entry);
96        Ok(tag)
97    }
98
99    fn export_sync(
100        &self,
101        hash: Hash,
102        target: PathBuf,
103        _mode: ExportMode,
104        progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
105    ) -> io::Result<()> {
106        tracing::trace!("exporting {} to {}", hash, target.display());
107
108        if !target.is_absolute() {
109            return Err(io::Error::new(
110                io::ErrorKind::InvalidInput,
111                "target path must be absolute",
112            ));
113        }
114        let parent = target.parent().ok_or_else(|| {
115            io::Error::new(
116                io::ErrorKind::InvalidInput,
117                "target path has no parent directory",
118            )
119        })?;
120        // create the directory in which the target file is
121        std::fs::create_dir_all(parent)?;
122        let state = self.read_lock();
123        let entry = state
124            .entries
125            .get(&hash)
126            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?;
127        let reader = &entry.inner.data;
128        let size = reader.read().unwrap().current_size();
129        let mut file = std::fs::File::create(target)?;
130        for offset in (0..size).step_by(1024 * 1024) {
131            let bytes = reader.read().unwrap().read_data_at(offset, 1024 * 1024);
132            file.write_at(offset, &bytes)?;
133            progress(offset)?;
134        }
135        std::io::Write::flush(&mut file)?;
136        drop(file);
137        Ok(())
138    }
139}
140
141impl super::Store for Store {
142    async fn import_file(
143        &self,
144        path: std::path::PathBuf,
145        _mode: ImportMode,
146        format: BlobFormat,
147        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
148    ) -> io::Result<(TempTag, u64)> {
149        let this = self.clone();
150        tokio::task::spawn_blocking(move || {
151            let id = progress.new_id();
152            progress.blocking_send(ImportProgress::Found {
153                id,
154                name: path.to_string_lossy().to_string(),
155            })?;
156            progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
157            // todo: provide progress for reading into mem
158            let bytes: Bytes = std::fs::read(path)?.into();
159            let size = bytes.len() as u64;
160            progress.blocking_send(ImportProgress::Size { id, size })?;
161            let tag = this.import_bytes_sync(id, bytes, format, progress)?;
162            Ok((tag, size))
163        })
164        .await?
165    }
166
167    async fn import_stream(
168        &self,
169        mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
170        format: BlobFormat,
171        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
172    ) -> io::Result<(TempTag, u64)> {
173        let this = self.clone();
174        let id = progress.new_id();
175        let name = temp_name();
176        progress.send(ImportProgress::Found { id, name }).await?;
177        let mut bytes = BytesMut::new();
178        while let Some(chunk) = data.next().await {
179            bytes.extend_from_slice(&chunk?);
180            progress
181                .try_send(ImportProgress::CopyProgress {
182                    id,
183                    offset: bytes.len() as u64,
184                })
185                .ok();
186        }
187        let bytes = bytes.freeze();
188        let size = bytes.len() as u64;
189        progress.blocking_send(ImportProgress::Size { id, size })?;
190        let tag = this.import_bytes_sync(id, bytes, format, progress)?;
191        Ok((tag, size))
192    }
193
194    async fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> io::Result<TempTag> {
195        let this = self.clone();
196        tokio::task::spawn_blocking(move || {
197            this.import_bytes_sync(0, bytes, format, IgnoreProgressSender::default())
198        })
199        .await?
200    }
201
202    async fn set_tag(&self, name: Tag, value: Option<HashAndFormat>) -> io::Result<()> {
203        let mut state = self.write_lock();
204        if let Some(value) = value {
205            state.tags.insert(name, value);
206        } else {
207            state.tags.remove(&name);
208        }
209        Ok(())
210    }
211
212    async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
213        let mut state = self.write_lock();
214        let tag = Tag::auto(SystemTime::now(), |x| state.tags.contains_key(x));
215        state.tags.insert(tag.clone(), hash);
216        Ok(tag)
217    }
218
219    fn temp_tag(&self, tag: HashAndFormat) -> TempTag {
220        TempTag::new(tag, Some(self.inner.clone()))
221    }
222
223    async fn gc_start(&self) -> io::Result<()> {
224        Ok(())
225    }
226
227    async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
228        let mut state = self.write_lock();
229        for hash in hashes {
230            if !state.temp.contains(&hash) {
231                state.entries.remove(&hash);
232            }
233        }
234        Ok(())
235    }
236
237    async fn shutdown(&self) {}
238}
239
240#[derive(Debug, Default)]
241struct StateInner {
242    entries: BTreeMap<Hash, Entry>,
243    tags: BTreeMap<Tag, HashAndFormat>,
244    temp: TempCounterMap,
245}
246
247/// An in memory entry
248#[derive(Debug, Clone)]
249pub struct Entry {
250    inner: Arc<EntryInner>,
251    complete: bool,
252}
253
254#[derive(Debug)]
255struct EntryInner {
256    hash: Hash,
257    data: RwLock<MutableMemStorage>,
258}
259
260impl MapEntry for Entry {
261    fn hash(&self) -> Hash {
262        self.inner.hash
263    }
264
265    fn size(&self) -> BaoBlobSize {
266        let size = self.inner.data.read().unwrap().current_size();
267        BaoBlobSize::new(size, self.complete)
268    }
269
270    fn is_complete(&self) -> bool {
271        self.complete
272    }
273
274    async fn outboard(&self) -> io::Result<impl Outboard> {
275        let size = self.inner.data.read().unwrap().current_size();
276        Ok(PreOrderOutboard {
277            root: self.hash().into(),
278            tree: BaoTree::new(size, IROH_BLOCK_SIZE),
279            data: OutboardReader(self.inner.clone()),
280        })
281    }
282
283    async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
284        Ok(DataReader(self.inner.clone()))
285    }
286}
287
288impl MapEntryMut for Entry {
289    async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
290        Ok(BatchWriter(self.inner.clone()))
291    }
292}
293
294struct DataReader(Arc<EntryInner>);
295
296impl AsyncSliceReader for DataReader {
297    async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
298        Ok(self.0.data.read().unwrap().read_data_at(offset, len))
299    }
300
301    async fn size(&mut self) -> std::io::Result<u64> {
302        Ok(self.0.data.read().unwrap().data_len())
303    }
304}
305
306struct OutboardReader(Arc<EntryInner>);
307
308impl AsyncSliceReader for OutboardReader {
309    async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
310        Ok(self.0.data.read().unwrap().read_outboard_at(offset, len))
311    }
312
313    async fn size(&mut self) -> std::io::Result<u64> {
314        Ok(self.0.data.read().unwrap().outboard_len())
315    }
316}
317
318struct BatchWriter(Arc<EntryInner>);
319
320impl super::BaoBatchWriter for BatchWriter {
321    async fn write_batch(
322        &mut self,
323        size: u64,
324        batch: Vec<bao_tree::io::fsm::BaoContentItem>,
325    ) -> io::Result<()> {
326        self.0.data.write().unwrap().write_batch(size, &batch)
327    }
328
329    async fn sync(&mut self) -> io::Result<()> {
330        Ok(())
331    }
332}
333
334impl super::Map for Store {
335    type Entry = Entry;
336
337    async fn get(&self, hash: &Hash) -> std::io::Result<Option<Self::Entry>> {
338        Ok(self.inner.0.read().unwrap().entries.get(hash).cloned())
339    }
340}
341
342impl super::MapMut for Store {
343    type EntryMut = Entry;
344
345    async fn get_mut(&self, hash: &Hash) -> std::io::Result<Option<Self::EntryMut>> {
346        self.get(hash).await
347    }
348
349    async fn get_or_create(&self, hash: Hash, _size: u64) -> std::io::Result<Entry> {
350        let entry = Entry {
351            inner: Arc::new(EntryInner {
352                hash,
353                data: RwLock::new(MutableMemStorage::default()),
354            }),
355            complete: false,
356        };
357        Ok(entry)
358    }
359
360    async fn entry_status(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
361        self.entry_status_sync(hash)
362    }
363
364    fn entry_status_sync(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
365        Ok(match self.inner.0.read().unwrap().entries.get(hash) {
366            Some(entry) => {
367                if entry.complete {
368                    crate::store::EntryStatus::Complete
369                } else {
370                    crate::store::EntryStatus::Partial
371                }
372            }
373            None => crate::store::EntryStatus::NotFound,
374        })
375    }
376
377    async fn insert_complete(&self, mut entry: Entry) -> std::io::Result<()> {
378        let hash = entry.hash();
379        let mut inner = self.inner.0.write().unwrap();
380        let complete = inner
381            .entries
382            .get(&hash)
383            .map(|x| x.complete)
384            .unwrap_or_default();
385        if !complete {
386            entry.complete = true;
387            inner.entries.insert(hash, entry);
388        }
389        Ok(())
390    }
391}
392
393impl ReadableStore for Store {
394    async fn blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
395        let entries = self.read_lock().entries.clone();
396        Ok(Box::new(
397            entries
398                .into_values()
399                .filter(|x| x.complete)
400                .map(|x| Ok(x.hash())),
401        ))
402    }
403
404    async fn partial_blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
405        let entries = self.read_lock().entries.clone();
406        Ok(Box::new(
407            entries
408                .into_values()
409                .filter(|x| !x.complete)
410                .map(|x| Ok(x.hash())),
411        ))
412    }
413
414    async fn tags(
415        &self,
416    ) -> io::Result<crate::store::DbIter<(crate::Tag, iroh_base::hash::HashAndFormat)>> {
417        #[allow(clippy::mutable_key_type)]
418        let tags = self.read_lock().tags.clone();
419        Ok(Box::new(tags.into_iter().map(Ok)))
420    }
421
422    fn temp_tags(
423        &self,
424    ) -> Box<dyn Iterator<Item = iroh_base::hash::HashAndFormat> + Send + Sync + 'static> {
425        let tags = self.read_lock().temp.keys();
426        Box::new(tags)
427    }
428
429    async fn consistency_check(
430        &self,
431        _repair: bool,
432        _tx: BoxedProgressSender<ConsistencyCheckProgress>,
433    ) -> io::Result<()> {
434        todo!()
435    }
436
437    async fn export(
438        &self,
439        hash: Hash,
440        target: std::path::PathBuf,
441        mode: crate::store::ExportMode,
442        progress: ExportProgressCb,
443    ) -> io::Result<()> {
444        let this = self.clone();
445        tokio::task::spawn_blocking(move || this.export_sync(hash, target, mode, progress)).await?
446    }
447}