ipfs_embed_sqlite/
lib.rs

1use futures::channel::mpsc;
2pub use ipfs_sqlite_block_store::TempPin;
3use ipfs_sqlite_block_store::{
4    cache::{BlockInfo, CacheTracker, SqliteCacheTracker},
5    BlockStore, Config, SizeTargets, Synchronous,
6};
7use lazy_static::lazy_static;
8use libipld::codec::References;
9use libipld::store::StoreParams;
10use libipld::{Block, Cid, Ipld, Result};
11use parking_lot::Mutex;
12use prometheus::core::{Collector, Desc};
13use prometheus::proto::MetricFamily;
14use prometheus::{HistogramOpts, HistogramVec, IntCounterVec, IntGauge, Opts, Registry};
15use std::future::Future;
16use std::marker::PhantomData;
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::Duration;
20
21/// Storage configuration.
22#[derive(Clone, Debug, Eq, PartialEq)]
23pub struct StorageConfig {
24    /// The path to use for the block store. If it is `None` an in-memory block store
25    /// will be used.
26    pub path: Option<PathBuf>,
27    /// The target number of blocks.
28    ///
29    /// Up to this number,Up to this number, the store will retain everything even if
30    /// not pinned. Once this number is exceeded, the store will run garbage collection
31    /// of all unpinned blocks until the block criterion is met again.
32    ///
33    /// To completely disable storing of non-pinned blocks, set this to 0. Even then,
34    /// the store will never delete pinned blocks.
35    pub cache_size_blocks: u64,
36    /// The target store size.
37    ///
38    /// Up to this size, the store will retain everything even if not pinned. Once this
39    /// size is exceeded, the store will run garbage collection of all unpinned blocks
40    /// until the size criterion is met again.
41    ///
42    /// The store will never delete pinned blocks.
43    pub cache_size_bytes: u64,
44    /// The interval at which the garbage collector is run.
45    ///
46    /// Note that this is implemented as delays between gcs, so it will not run exactly at this
47    /// interval, but there will be some drift if gc takes long.
48    pub gc_interval: Duration,
49    /// The minimum number of blocks to collect in any case.
50    ///
51    /// Using this parameter, it is possible to guarantee a minimum rate with which the gc will
52    /// be able to keep up. It is `gc_min_blocks` / `gc_interval`.
53    pub gc_min_blocks: usize,
54    /// The target maximum gc duration of a single garbage collector run.
55    ///
56    /// This can not be guaranteed, since we guarantee to collect at least `gc_min_blocks`. But
57    /// as soon as this duration is exceeded, the incremental gc will stop doing additional work.
58    pub gc_target_duration: Duration,
59}
60
61impl StorageConfig {
62    /// Creates a new `StorageConfig`.
63    pub fn new(path: Option<PathBuf>, cache_size: u64, gc_interval: Duration) -> Self {
64        Self {
65            path,
66            cache_size_blocks: cache_size,
67            cache_size_bytes: u64::MAX,
68            gc_interval,
69            gc_min_blocks: usize::MAX,
70            gc_target_duration: Duration::new(u64::MAX, 1_000_000_000 - 1),
71        }
72    }
73}
74
75#[derive(Clone, Debug, Eq, PartialEq)]
76pub enum StorageEvent {
77    Remove(Cid),
78}
79
80#[derive(Clone)]
81pub struct StorageService<S: StoreParams> {
82    _marker: PhantomData<S>,
83    store: Arc<Mutex<BlockStore>>,
84    gc_target_duration: Duration,
85    gc_min_blocks: usize,
86}
87
88impl<S: StoreParams> StorageService<S>
89where
90    Ipld: References<S::Codecs>,
91{
92    pub fn open(config: StorageConfig, tx: mpsc::UnboundedSender<StorageEvent>) -> Result<Self> {
93        let size = SizeTargets::new(config.cache_size_blocks, config.cache_size_bytes);
94        let store_config = Config::default()
95            .with_size_targets(size)
96            .with_pragma_synchronous(Synchronous::Normal);
97        let store = if let Some(path) = config.path {
98            let tracker = SqliteCacheTracker::open(&path, |access, _| Some(access))?;
99            let tracker = IpfsCacheTracker { tracker, tx };
100            BlockStore::open(path, store_config.with_cache_tracker(tracker))?
101        } else {
102            let tracker = SqliteCacheTracker::memory(|access, _| Some(access))?;
103            let tracker = IpfsCacheTracker { tracker, tx };
104            BlockStore::memory(store_config.with_cache_tracker(tracker))?
105        };
106        let store = Arc::new(Mutex::new(store));
107        let gc = store.clone();
108        let gc_interval = config.gc_interval;
109        let gc_min_blocks = config.gc_min_blocks;
110        let gc_target_duration = config.gc_target_duration;
111        async_global_executor::spawn(async_global_executor::spawn_blocking(move || {
112            std::thread::sleep(gc_interval / 2);
113            loop {
114                tracing::debug!("gc_loop running incremental gc");
115                gc.lock()
116                    .incremental_gc(gc_min_blocks, gc_target_duration)
117                    .ok();
118                std::thread::sleep(gc_interval / 2);
119                tracing::debug!("gc_loop running incremental delete orphaned");
120                gc.lock()
121                    .incremental_delete_orphaned(gc_min_blocks, gc_target_duration)
122                    .ok();
123                std::thread::sleep(gc_interval / 2);
124            }
125        }))
126        .detach();
127        Ok(Self {
128            _marker: PhantomData,
129            gc_target_duration: config.gc_target_duration,
130            gc_min_blocks: config.gc_min_blocks,
131            store,
132        })
133    }
134
135    pub fn create_temp_pin(&self) -> Result<TempPin> {
136        observe_query::<_, std::io::Error, _>("create_temp_pin", || {
137            Ok(self.store.lock().temp_pin())
138        })
139    }
140
141    pub fn temp_pin(
142        &self,
143        temp: &TempPin,
144        iter: impl IntoIterator<Item = Cid> + Send + 'static,
145    ) -> Result<()> {
146        observe_query("temp_pin", || {
147            self.store.lock().assign_temp_pin(&temp, iter)
148        })
149    }
150
151    pub fn iter(&self) -> Result<impl Iterator<Item = Cid>> {
152        let cids = observe_query("iter", || self.store.lock().get_block_cids::<Vec<Cid>>())?;
153        Ok(cids.into_iter())
154    }
155
156    pub fn contains(&self, cid: &Cid) -> Result<bool> {
157        observe_query("contains", || self.store.lock().has_block(cid))
158    }
159
160    pub fn get(&self, cid: &Cid) -> Result<Option<Vec<u8>>> {
161        observe_query("get", || self.store.lock().get_block(cid))
162    }
163
164    pub fn insert(&self, block: &Block<S>) -> Result<()> {
165        observe_query("insert", || self.store.lock().put_block(block, None))
166    }
167
168    pub async fn evict(&self) -> Result<()> {
169        let store = self.store.clone();
170        let gc_min_blocks = self.gc_min_blocks;
171        let gc_target_duration = self.gc_target_duration;
172        async_global_executor::spawn_blocking(move || {
173            while !store
174                .lock()
175                .incremental_gc(gc_min_blocks, gc_target_duration)?
176            {}
177            while !store
178                .lock()
179                .incremental_delete_orphaned(gc_min_blocks, gc_target_duration)?
180            {}
181            Ok(())
182        })
183        .await
184    }
185
186    pub fn alias(&self, alias: &[u8], cid: Option<&Cid>) -> Result<()> {
187        observe_query("alias", || self.store.lock().alias(alias, cid))
188    }
189
190    pub fn resolve(&self, alias: &[u8]) -> Result<Option<Cid>> {
191        observe_query("resolve", || self.store.lock().resolve(alias))
192    }
193
194    pub fn reverse_alias(&self, cid: &Cid) -> Result<Option<Vec<Vec<u8>>>> {
195        observe_query("reverse_alias", || self.store.lock().reverse_alias(cid))
196    }
197
198    pub fn missing_blocks(&self, cid: &Cid) -> Result<Vec<Cid>> {
199        observe_query("missing_blocks", || {
200            self.store.lock().get_missing_blocks(cid)
201        })
202    }
203
204    pub async fn flush(&self) -> Result<()> {
205        let store = self.store.clone();
206        let flush = async_global_executor::spawn_blocking(move || store.lock().flush());
207        observe_future("flush", flush).await
208    }
209
210    pub fn register_metrics(&self, registry: &Registry) -> Result<()> {
211        registry.register(Box::new(QUERIES_TOTAL.clone()))?;
212        registry.register(Box::new(QUERY_DURATION.clone()))?;
213        registry.register(Box::new(SqliteStoreCollector::new(self.store.clone())))?;
214        Ok(())
215    }
216}
217
218#[derive(Debug)]
219struct IpfsCacheTracker<T> {
220    tracker: T,
221    tx: mpsc::UnboundedSender<StorageEvent>,
222}
223
224impl<T: CacheTracker> CacheTracker for IpfsCacheTracker<T> {
225    fn blocks_accessed(&self, blocks: Vec<BlockInfo>) {
226        self.tracker.blocks_accessed(blocks)
227    }
228
229    fn blocks_deleted(&self, blocks: Vec<BlockInfo>) {
230        for block in &blocks {
231            self.tx
232                .unbounded_send(StorageEvent::Remove(*block.cid()))
233                .ok();
234        }
235        self.tracker.blocks_deleted(blocks)
236    }
237
238    fn retain_ids(&self, ids: &[i64]) {
239        self.tracker.retain_ids(ids)
240    }
241
242    fn sort_ids(&self, ids: &mut [i64]) {
243        self.tracker.sort_ids(ids)
244    }
245}
246
247lazy_static! {
248    pub static ref QUERIES_TOTAL: IntCounterVec = IntCounterVec::new(
249        Opts::new(
250            "block_store_queries_total",
251            "Number of block store requests labelled by type."
252        ),
253        &["type"],
254    )
255    .unwrap();
256    pub static ref QUERY_DURATION: HistogramVec = HistogramVec::new(
257        HistogramOpts::new(
258            "block_store_query_duration",
259            "Duration of store queries labelled by type.",
260        ),
261        &["type"],
262    )
263    .unwrap();
264}
265
266fn observe_query<T, E, F>(name: &'static str, query: F) -> Result<T>
267where
268    E: std::error::Error + Send + Sync + 'static,
269    F: FnOnce() -> Result<T, E>,
270{
271    QUERIES_TOTAL.with_label_values(&[name]).inc();
272    let timer = QUERY_DURATION.with_label_values(&[name]).start_timer();
273    let res = query();
274    if res.is_ok() {
275        timer.observe_duration();
276    } else {
277        timer.stop_and_discard();
278    }
279    Ok(res?)
280}
281
282async fn observe_future<T, E, F>(name: &'static str, query: F) -> Result<T>
283where
284    E: std::error::Error + Send + Sync + 'static,
285    F: Future<Output = Result<T, E>>,
286{
287    QUERIES_TOTAL.with_label_values(&[name]).inc();
288    let timer = QUERY_DURATION.with_label_values(&[name]).start_timer();
289    let res = query.await;
290    if res.is_ok() {
291        timer.observe_duration();
292    } else {
293        timer.stop_and_discard();
294    }
295    Ok(res?)
296}
297
298struct SqliteStoreCollector {
299    desc: Desc,
300    store: Arc<Mutex<BlockStore>>,
301}
302
303impl Collector for SqliteStoreCollector {
304    fn desc(&self) -> Vec<&Desc> {
305        vec![&self.desc]
306    }
307
308    fn collect(&self) -> Vec<MetricFamily> {
309        let mut family = vec![];
310
311        if let Ok(stats) = self.store.lock().get_store_stats() {
312            let store_block_count =
313                IntGauge::new("block_store_block_count", "Number of stored blocks").unwrap();
314            store_block_count.set(stats.count() as _);
315            family.push(store_block_count.collect()[0].clone());
316
317            let store_size =
318                IntGauge::new("block_store_size", "Size in bytes of stored blocks").unwrap();
319            store_size.set(stats.size() as _);
320            family.push(store_size.collect()[0].clone());
321        }
322
323        family
324    }
325}
326
327impl SqliteStoreCollector {
328    pub fn new(store: Arc<Mutex<BlockStore>>) -> Self {
329        let desc = Desc::new(
330            "block_store_stats".into(),
331            ".".into(),
332            Default::default(),
333            Default::default(),
334        )
335        .unwrap();
336        Self { store, desc }
337    }
338}
339
340#[cfg(test)]
341mod tests {
342    use super::*;
343    use futures::stream::StreamExt;
344    use libipld::cbor::DagCborCodec;
345    use libipld::multihash::Code;
346    use libipld::store::DefaultParams;
347    use libipld::{alias, ipld};
348
349    fn create_block(ipld: &Ipld) -> Block<DefaultParams> {
350        Block::encode(DagCborCodec, Code::Blake3_256, ipld).unwrap()
351    }
352
353    macro_rules! assert_evicted {
354        ($store:expr, $block:expr) => {
355            assert_eq!($store.reverse_alias($block.cid()).unwrap(), None);
356        };
357    }
358
359    macro_rules! assert_pinned {
360        ($store:expr, $block:expr) => {
361            assert_eq!(
362                $store
363                    .reverse_alias($block.cid())
364                    .unwrap()
365                    .map(|a| !a.is_empty()),
366                Some(true)
367            );
368        };
369    }
370
371    macro_rules! assert_unpinned {
372        ($store:expr, $block:expr) => {
373            assert_eq!(
374                $store
375                    .reverse_alias($block.cid())
376                    .unwrap()
377                    .map(|a| !a.is_empty()),
378                Some(false)
379            );
380        };
381    }
382
383    fn tracing_try_init() {
384        tracing_subscriber::fmt()
385            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
386            .try_init()
387            .ok();
388    }
389
390    fn create_store() -> (
391        StorageService<DefaultParams>,
392        mpsc::UnboundedReceiver<StorageEvent>,
393    ) {
394        let (tx, rx) = mpsc::unbounded();
395        let config = StorageConfig::new(None, 2, Duration::from_secs(100));
396        (StorageService::open(config, tx).unwrap(), rx)
397    }
398
399    #[async_std::test]
400    async fn test_store_evict() {
401        tracing_try_init();
402        let (store, mut rx) = create_store();
403        let blocks = [
404            create_block(&ipld!(0)),
405            create_block(&ipld!(1)),
406            create_block(&ipld!(2)),
407            create_block(&ipld!(3)),
408        ];
409        store.insert(&blocks[0]).unwrap();
410        store.insert(&blocks[1]).unwrap();
411        store.flush().await.unwrap();
412        store.evict().await.unwrap();
413        assert_unpinned!(&store, &blocks[0]);
414        assert_unpinned!(&store, &blocks[1]);
415        store.insert(&blocks[2]).unwrap();
416        store.flush().await.unwrap();
417        store.evict().await.unwrap();
418        assert_evicted!(&store, &blocks[0]);
419        assert_unpinned!(&store, &blocks[1]);
420        assert_unpinned!(&store, &blocks[2]);
421        store.get(blocks[1].cid()).unwrap();
422        store.insert(&blocks[3]).unwrap();
423        store.flush().await.unwrap();
424        store.evict().await.unwrap();
425        assert_unpinned!(&store, &blocks[1]);
426        assert_evicted!(&store, &blocks[2]);
427        assert_unpinned!(&store, &blocks[3]);
428        assert_eq!(
429            rx.next().await,
430            Some(StorageEvent::Remove(*blocks[0].cid()))
431        );
432        assert_eq!(
433            rx.next().await,
434            Some(StorageEvent::Remove(*blocks[2].cid()))
435        );
436    }
437
438    #[async_std::test]
439    #[allow(clippy::many_single_char_names)]
440    async fn test_store_unpin() {
441        tracing_try_init();
442        let (store, _) = create_store();
443        let a = create_block(&ipld!({ "a": [] }));
444        let b = create_block(&ipld!({ "b": [a.cid()] }));
445        let c = create_block(&ipld!({ "c": [a.cid()] }));
446        let x = alias!(x).as_bytes().to_vec();
447        let y = alias!(y).as_bytes().to_vec();
448        store.insert(&a).unwrap();
449        store.insert(&b).unwrap();
450        store.insert(&c).unwrap();
451        store.alias(&x, Some(b.cid())).unwrap();
452        store.alias(&y, Some(c.cid())).unwrap();
453        store.flush().await.unwrap();
454        assert_pinned!(&store, &a);
455        assert_pinned!(&store, &b);
456        assert_pinned!(&store, &c);
457        store.alias(&x, None).unwrap();
458        store.flush().await.unwrap();
459        assert_pinned!(&store, &a);
460        assert_unpinned!(&store, &b);
461        assert_pinned!(&store, &c);
462        store.alias(&y, None).unwrap();
463        store.flush().await.unwrap();
464        assert_unpinned!(&store, &a);
465        assert_unpinned!(&store, &b);
466        assert_unpinned!(&store, &c);
467    }
468
469    #[async_std::test]
470    #[allow(clippy::many_single_char_names)]
471    async fn test_store_unpin2() {
472        tracing_try_init();
473        let (store, _) = create_store();
474        let a = create_block(&ipld!({ "a": [] }));
475        let b = create_block(&ipld!({ "b": [a.cid()] }));
476        let x = alias!(x).as_bytes().to_vec();
477        let y = alias!(y).as_bytes().to_vec();
478        store.insert(&a).unwrap();
479        store.insert(&b).unwrap();
480        store.alias(&x, Some(b.cid())).unwrap();
481        store.alias(&y, Some(b.cid())).unwrap();
482        store.flush().await.unwrap();
483        assert_pinned!(&store, &a);
484        assert_pinned!(&store, &b);
485        store.alias(&x, None).unwrap();
486        store.flush().await.unwrap();
487        assert_pinned!(&store, &a);
488        assert_pinned!(&store, &b);
489        store.alias(&y, None).unwrap();
490        store.flush().await.unwrap();
491        assert_unpinned!(&store, &a);
492        assert_unpinned!(&store, &b);
493    }
494}