iroh_blobs/store/
mem.rs

1//! A full in memory database for iroh-blobs
2//!
3//! Main entry point is [Store].
4use std::{
5    collections::{BTreeMap, BTreeSet},
6    future::Future,
7    io,
8    path::PathBuf,
9    sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
10    time::SystemTime,
11};
12
13use bao_tree::{
14    io::{fsm::Outboard, outboard::PreOrderOutboard, sync::WriteAt},
15    BaoTree,
16};
17use bytes::{Bytes, BytesMut};
18use futures_lite::{Stream, StreamExt};
19use iroh_io::AsyncSliceReader;
20use tracing::info;
21
22use super::{
23    temp_name, BaoBatchWriter, ConsistencyCheckProgress, ExportMode, ExportProgressCb, ImportMode,
24    ImportProgress, Map, TempCounterMap,
25};
26use crate::{
27    store::{
28        mutable_mem_storage::MutableMemStorage, BaoBlobSize, MapEntry, MapEntryMut, ReadableStore,
29    },
30    util::{
31        progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender},
32        TagCounter, TagDrop,
33    },
34    BlobFormat, Hash, HashAndFormat, Tag, TempTag, IROH_BLOCK_SIZE,
35};
36
37/// A fully featured in memory database for iroh-blobs, including support for
38/// partial blobs.
39#[derive(Debug, Clone, Default)]
40pub struct Store {
41    inner: Arc<StoreInner>,
42}
43
44#[derive(Debug, Default)]
45struct StoreInner(RwLock<StateInner>);
46
47impl TagDrop for StoreInner {
48    fn on_drop(&self, inner: &HashAndFormat) {
49        tracing::trace!("temp tag drop: {:?}", inner);
50        let mut state = self.0.write().unwrap();
51        state.temp.dec(inner);
52    }
53}
54
55impl TagCounter for StoreInner {
56    fn on_create(&self, inner: &HashAndFormat) {
57        tracing::trace!("temp tagging: {:?}", inner);
58        let mut state = self.0.write().unwrap();
59        state.temp.inc(inner);
60    }
61}
62
63impl Store {
64    /// Create a new in memory store
65    pub fn new() -> Self {
66        Self::default()
67    }
68
69    /// Take a write lock on the store
70    fn write_lock(&self) -> RwLockWriteGuard<'_, StateInner> {
71        self.inner.0.write().unwrap()
72    }
73
74    /// Take a read lock on the store
75    fn read_lock(&self) -> RwLockReadGuard<'_, StateInner> {
76        self.inner.0.read().unwrap()
77    }
78
79    fn import_bytes_sync(
80        &self,
81        id: u64,
82        bytes: Bytes,
83        format: BlobFormat,
84        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
85    ) -> io::Result<TempTag> {
86        progress.blocking_send(ImportProgress::OutboardProgress { id, offset: 0 })?;
87        let progress2 = progress.clone();
88        let cb = move |offset| {
89            progress2
90                .try_send(ImportProgress::OutboardProgress { id, offset })
91                .ok();
92        };
93        let (storage, hash) = MutableMemStorage::complete(bytes, cb);
94        progress.blocking_send(ImportProgress::OutboardDone { id, hash })?;
95        use super::Store;
96        let tag = self.temp_tag(HashAndFormat { hash, format });
97        let entry = Entry {
98            inner: Arc::new(EntryInner {
99                hash,
100                data: RwLock::new(storage),
101            }),
102            complete: true,
103        };
104        self.write_lock().entries.insert(hash, entry);
105        Ok(tag)
106    }
107
108    fn export_sync(
109        &self,
110        hash: Hash,
111        target: PathBuf,
112        _mode: ExportMode,
113        progress: impl Fn(u64) -> io::Result<()> + Send + Sync + 'static,
114    ) -> io::Result<()> {
115        tracing::trace!("exporting {} to {}", hash, target.display());
116
117        if !target.is_absolute() {
118            return Err(io::Error::new(
119                io::ErrorKind::InvalidInput,
120                "target path must be absolute",
121            ));
122        }
123        let parent = target.parent().ok_or_else(|| {
124            io::Error::new(
125                io::ErrorKind::InvalidInput,
126                "target path has no parent directory",
127            )
128        })?;
129        // create the directory in which the target file is
130        std::fs::create_dir_all(parent)?;
131        let state = self.read_lock();
132        let entry = state
133            .entries
134            .get(&hash)
135            .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "hash not found"))?;
136        let reader = &entry.inner.data;
137        let size = reader.read().unwrap().current_size();
138        let mut file = std::fs::File::create(target)?;
139        for offset in (0..size).step_by(1024 * 1024) {
140            let bytes = reader.read().unwrap().read_data_at(offset, 1024 * 1024);
141            file.write_at(offset, &bytes)?;
142            progress(offset)?;
143        }
144        std::io::Write::flush(&mut file)?;
145        drop(file);
146        Ok(())
147    }
148}
149
150impl super::Store for Store {
151    async fn import_file(
152        &self,
153        path: std::path::PathBuf,
154        _mode: ImportMode,
155        format: BlobFormat,
156        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
157    ) -> io::Result<(TempTag, u64)> {
158        let this = self.clone();
159        tokio::task::spawn_blocking(move || {
160            let id = progress.new_id();
161            progress.blocking_send(ImportProgress::Found {
162                id,
163                name: path.to_string_lossy().to_string(),
164            })?;
165            progress.try_send(ImportProgress::CopyProgress { id, offset: 0 })?;
166            // todo: provide progress for reading into mem
167            let bytes: Bytes = std::fs::read(path)?.into();
168            let size = bytes.len() as u64;
169            progress.blocking_send(ImportProgress::Size { id, size })?;
170            let tag = this.import_bytes_sync(id, bytes, format, progress)?;
171            Ok((tag, size))
172        })
173        .await?
174    }
175
176    async fn import_stream(
177        &self,
178        mut data: impl Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static,
179        format: BlobFormat,
180        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
181    ) -> io::Result<(TempTag, u64)> {
182        let this = self.clone();
183        let id = progress.new_id();
184        let name = temp_name();
185        progress.send(ImportProgress::Found { id, name }).await?;
186        let mut bytes = BytesMut::new();
187        while let Some(chunk) = data.next().await {
188            bytes.extend_from_slice(&chunk?);
189            progress
190                .try_send(ImportProgress::CopyProgress {
191                    id,
192                    offset: bytes.len() as u64,
193                })
194                .ok();
195        }
196        let bytes = bytes.freeze();
197        let size = bytes.len() as u64;
198        progress.blocking_send(ImportProgress::Size { id, size })?;
199        let tag = this.import_bytes_sync(id, bytes, format, progress)?;
200        Ok((tag, size))
201    }
202
203    async fn import_bytes(&self, bytes: Bytes, format: BlobFormat) -> io::Result<TempTag> {
204        let this = self.clone();
205        tokio::task::spawn_blocking(move || {
206            this.import_bytes_sync(0, bytes, format, IgnoreProgressSender::default())
207        })
208        .await?
209    }
210
211    async fn rename_tag(&self, from: Tag, to: Tag) -> io::Result<()> {
212        let mut state = self.write_lock();
213        let value = state.tags.remove(&from).ok_or_else(|| {
214            io::Error::new(
215                io::ErrorKind::NotFound,
216                format!("tag not found: {:?}", from),
217            )
218        })?;
219        state.tags.insert(to, value);
220        Ok(())
221    }
222
223    async fn set_tag(&self, name: Tag, value: HashAndFormat) -> io::Result<()> {
224        let mut state = self.write_lock();
225        state.tags.insert(name, value);
226        Ok(())
227    }
228
229    async fn delete_tags(&self, from: Option<Tag>, to: Option<Tag>) -> io::Result<()> {
230        let mut state = self.write_lock();
231        info!("deleting tags from {:?} to {:?}", from, to);
232        // state.tags.remove(&from.unwrap());
233        // todo: more efficient impl
234        state.tags.retain(|tag, _| {
235            if let Some(from) = &from {
236                if tag < from {
237                    return true;
238                }
239            }
240            if let Some(to) = &to {
241                if tag >= to {
242                    return true;
243                }
244            }
245            info!("    removing {:?}", tag);
246            false
247        });
248        Ok(())
249    }
250
251    async fn create_tag(&self, hash: HashAndFormat) -> io::Result<Tag> {
252        let mut state = self.write_lock();
253        let tag = Tag::auto(SystemTime::now(), |x| state.tags.contains_key(x));
254        state.tags.insert(tag.clone(), hash);
255        Ok(tag)
256    }
257
258    fn temp_tag(&self, tag: HashAndFormat) -> TempTag {
259        self.inner.temp_tag(tag)
260    }
261
262    async fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G)
263    where
264        G: Fn() -> Gut,
265        Gut: Future<Output = BTreeSet<Hash>> + Send,
266    {
267        super::gc_run_loop(self, config, move || async { Ok(()) }, protected_cb).await
268    }
269
270    async fn delete(&self, hashes: Vec<Hash>) -> io::Result<()> {
271        let mut state = self.write_lock();
272        for hash in hashes {
273            if !state.temp.contains(&hash) {
274                state.entries.remove(&hash);
275            }
276        }
277        Ok(())
278    }
279
280    async fn shutdown(&self) {}
281
282    async fn sync(&self) -> io::Result<()> {
283        Ok(())
284    }
285}
286
287#[derive(Debug, Default)]
288struct StateInner {
289    entries: BTreeMap<Hash, Entry>,
290    tags: BTreeMap<Tag, HashAndFormat>,
291    temp: TempCounterMap,
292}
293
294/// An in memory entry
295#[derive(Debug, Clone)]
296pub struct Entry {
297    inner: Arc<EntryInner>,
298    complete: bool,
299}
300
301#[derive(Debug)]
302struct EntryInner {
303    hash: Hash,
304    data: RwLock<MutableMemStorage>,
305}
306
307impl MapEntry for Entry {
308    fn hash(&self) -> Hash {
309        self.inner.hash
310    }
311
312    fn size(&self) -> BaoBlobSize {
313        let size = self.inner.data.read().unwrap().current_size();
314        BaoBlobSize::new(size, self.complete)
315    }
316
317    fn is_complete(&self) -> bool {
318        self.complete
319    }
320
321    async fn outboard(&self) -> io::Result<impl Outboard> {
322        let size = self.inner.data.read().unwrap().current_size();
323        Ok(PreOrderOutboard {
324            root: self.hash().into(),
325            tree: BaoTree::new(size, IROH_BLOCK_SIZE),
326            data: OutboardReader(self.inner.clone()),
327        })
328    }
329
330    async fn data_reader(&self) -> io::Result<impl AsyncSliceReader> {
331        Ok(DataReader(self.inner.clone()))
332    }
333}
334
335impl MapEntryMut for Entry {
336    async fn batch_writer(&self) -> io::Result<impl BaoBatchWriter> {
337        Ok(BatchWriter(self.inner.clone()))
338    }
339}
340
341struct DataReader(Arc<EntryInner>);
342
343impl AsyncSliceReader for DataReader {
344    async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
345        Ok(self.0.data.read().unwrap().read_data_at(offset, len))
346    }
347
348    async fn size(&mut self) -> std::io::Result<u64> {
349        Ok(self.0.data.read().unwrap().data_len())
350    }
351}
352
353struct OutboardReader(Arc<EntryInner>);
354
355impl AsyncSliceReader for OutboardReader {
356    async fn read_at(&mut self, offset: u64, len: usize) -> std::io::Result<Bytes> {
357        Ok(self.0.data.read().unwrap().read_outboard_at(offset, len))
358    }
359
360    async fn size(&mut self) -> std::io::Result<u64> {
361        Ok(self.0.data.read().unwrap().outboard_len())
362    }
363}
364
365struct BatchWriter(Arc<EntryInner>);
366
367impl super::BaoBatchWriter for BatchWriter {
368    async fn write_batch(
369        &mut self,
370        size: u64,
371        batch: Vec<bao_tree::io::fsm::BaoContentItem>,
372    ) -> io::Result<()> {
373        self.0.data.write().unwrap().write_batch(size, &batch)
374    }
375
376    async fn sync(&mut self) -> io::Result<()> {
377        Ok(())
378    }
379}
380
381impl super::Map for Store {
382    type Entry = Entry;
383
384    async fn get(&self, hash: &Hash) -> std::io::Result<Option<Self::Entry>> {
385        Ok(self.inner.0.read().unwrap().entries.get(hash).cloned())
386    }
387}
388
389impl super::MapMut for Store {
390    type EntryMut = Entry;
391
392    async fn get_mut(&self, hash: &Hash) -> std::io::Result<Option<Self::EntryMut>> {
393        self.get(hash).await
394    }
395
396    async fn get_or_create(&self, hash: Hash, _size: u64) -> std::io::Result<Entry> {
397        let entry = Entry {
398            inner: Arc::new(EntryInner {
399                hash,
400                data: RwLock::new(MutableMemStorage::default()),
401            }),
402            complete: false,
403        };
404        Ok(entry)
405    }
406
407    async fn entry_status(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
408        self.entry_status_sync(hash)
409    }
410
411    fn entry_status_sync(&self, hash: &Hash) -> std::io::Result<crate::store::EntryStatus> {
412        Ok(match self.inner.0.read().unwrap().entries.get(hash) {
413            Some(entry) => {
414                if entry.complete {
415                    crate::store::EntryStatus::Complete
416                } else {
417                    crate::store::EntryStatus::Partial
418                }
419            }
420            None => crate::store::EntryStatus::NotFound,
421        })
422    }
423
424    async fn insert_complete(&self, mut entry: Entry) -> std::io::Result<()> {
425        let hash = entry.hash();
426        let mut inner = self.inner.0.write().unwrap();
427        let complete = inner
428            .entries
429            .get(&hash)
430            .map(|x| x.complete)
431            .unwrap_or_default();
432        if !complete {
433            entry.complete = true;
434            inner.entries.insert(hash, entry);
435        }
436        Ok(())
437    }
438}
439
440impl ReadableStore for Store {
441    async fn blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
442        let entries = self.read_lock().entries.clone();
443        Ok(Box::new(
444            entries
445                .into_values()
446                .filter(|x| x.complete)
447                .map(|x| Ok(x.hash())),
448        ))
449    }
450
451    async fn partial_blobs(&self) -> io::Result<crate::store::DbIter<Hash>> {
452        let entries = self.read_lock().entries.clone();
453        Ok(Box::new(
454            entries
455                .into_values()
456                .filter(|x| !x.complete)
457                .map(|x| Ok(x.hash())),
458        ))
459    }
460
461    async fn tags(
462        &self,
463        from: Option<Tag>,
464        to: Option<Tag>,
465    ) -> io::Result<crate::store::DbIter<(crate::Tag, crate::HashAndFormat)>> {
466        #[allow(clippy::mutable_key_type)]
467        let tags = self.read_lock().tags.clone();
468        let tags = tags
469            .into_iter()
470            .filter(move |(tag, _)| {
471                if let Some(from) = &from {
472                    if tag < from {
473                        return false;
474                    }
475                }
476                if let Some(to) = &to {
477                    if tag >= to {
478                        return false;
479                    }
480                }
481                true
482            })
483            .map(Ok);
484        Ok(Box::new(tags))
485    }
486
487    fn temp_tags(&self) -> Box<dyn Iterator<Item = crate::HashAndFormat> + Send + Sync + 'static> {
488        let tags = self.read_lock().temp.keys();
489        Box::new(tags)
490    }
491
492    async fn consistency_check(
493        &self,
494        _repair: bool,
495        _tx: BoxedProgressSender<ConsistencyCheckProgress>,
496    ) -> io::Result<()> {
497        todo!()
498    }
499
500    async fn export(
501        &self,
502        hash: Hash,
503        target: std::path::PathBuf,
504        mode: crate::store::ExportMode,
505        progress: ExportProgressCb,
506    ) -> io::Result<()> {
507        let this = self.clone();
508        tokio::task::spawn_blocking(move || this.export_sync(hash, target, mode, progress)).await?
509    }
510}