ipfs_sqlite_block_store/
lib.rs

1//! # IPFS sqlite block store
2//!
3//! A block store for a rust implementation of [ipfs](https://ipfs.io/).
4//!
5//! # Concepts
6//!
7//! ## Aliases
8//!
9//! An alias is a named pin of a root. When a root is aliased, none of the leaves of the dag pointed
10//! to by the root will be collected by gc. However, a root being aliased does not mean that the dag
11//! must be complete.
12//!
13//! ## Temporary aliases
14//!
15//! A temporary alias is an unnamed alias that is just for the purpose of protecting blocks from gc
16//! while a large tree is being constructed. While an alias maps a single name to a single root, a
17//! temporary alias can be assigned to an arbitary number of blocks before the dag is finished.
18//!
19//! A temporary alias will be deleted as soon as the handle goes out of scope.
20//!
21//! ## Garbage Collection (GC)
22//!
23//! GC refers to the process of removing unpinned blocks. It runs only when the configured size
24//! targets are exceeded. [Size targets](SizeTargets) contain both the total size of the store
25//! and the number of blocks.
26//!
27//! GC will run incrementally, deleting blocks until the size targets are no longer exceeded. The
28//! order in which unpinned blocks will be deleted can be customized.
29//!
30//! ## Caching
31//!
32//! For unpinned blocks, it is possible to customize which blocks have the highest value using a
33//! [CacheTracker](cache::CacheTracker). The default is to [do nothing](cache::NoopCacheTracker)
34//! and has no performance overhead.
35//!
36//! The most elaborate implemented strategy is to keep track of access times in a separate database,
37//! via the [SqliteCacheTracker](cache::SqliteCacheTracker), which has a slight performance overhead.
38//!
39//! The performance overhead of writing to an access tracking database on each block read can be
40//! mitigated by using the [AsyncCacheTracker](cache::AsyncCacheTracker) wrapper to perform the database
41//! writes on a different thread.
42//!
43//! # Usage
44//!
45//! ## Blocking
46//!
47//! For blocking usage, use [BlockStore](BlockStore). This is the most low level interface.
48//!
49//! # Major differences to the go-ipfs pinning concept
50//!
51//! - Pinning/aliasing a root does not require that the dag is complete
52//! - Aliases/named pins as opposed to unnamed and non-reference-counted pins
53//! - Temporary pins as a mechanism to keep blocks safe from gc while a tree is being constructed
54pub mod cache;
55mod cidbytes;
56mod db;
57mod error;
58#[cfg(test)]
59mod tests;
60mod transaction;
61
62use cache::{CacheTracker, NoopCacheTracker};
63use db::*;
64use error::Context;
65pub use error::{BlockStoreError, Result};
66use libipld::{codec::References, store::StoreParams, Block, Cid, Ipld};
67use parking_lot::Mutex;
68use rusqlite::{Connection, DatabaseName, OpenFlags};
69use std::{
70    borrow::Cow,
71    collections::HashSet,
72    fmt,
73    iter::FromIterator,
74    marker::PhantomData,
75    mem,
76    ops::DerefMut,
77    path::{Path, PathBuf},
78    sync::{
79        atomic::{AtomicBool, Ordering},
80        Arc,
81    },
82    time::Duration,
83};
84use tracing::*;
85pub use transaction::Transaction;
86
87#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
88pub enum DbPath {
89    File(PathBuf),
90    Memory,
91}
92
93impl DbPath {
94    fn is_memory(&self) -> bool {
95        !matches!(self, DbPath::File(_))
96    }
97}
98
99/// Size targets for a store. Gc of non-pinned blocks will start once one of the size targets is exceeded.
100///
101/// There are targets for both block count and block size. The reason for this is that a store that has
102/// a very large number of tiny blocks will become sluggish despite not having a large total size.
103///
104/// Size targets only apply to non-pinned blocks. Pinned blocks will never be gced even if exceeding one of the
105/// size targets.
106#[derive(Debug, Clone, Copy, Default)]
107pub struct SizeTargets {
108    /// target number of blocks.
109    ///
110    /// Up to this number, the store will retain everything even if not pinned.
111    /// Once this number is exceeded, the store will run garbage collection of all
112    /// unpinned blocks until the block criterion is met again.
113    ///
114    /// To completely disable storing of non-pinned blocks, set this to 0.
115    /// Even then, the store will never delete pinned blocks.
116    pub count: u64,
117
118    /// target store size.
119    ///
120    /// Up to this size, the store will retain everything even if not pinned.
121    /// Once this size is exceeded, the store will run garbage collection of all
122    /// unpinned blocks until the size criterion is met again.
123    ///
124    /// The store will never delete pinned blocks.
125    pub size: u64,
126}
127
128impl SizeTargets {
129    pub fn new(count: u64, size: u64) -> Self {
130        Self { count, size }
131    }
132
133    pub fn exceeded(&self, stats: &StoreStats) -> bool {
134        stats.count > self.count || stats.size > self.size
135    }
136
137    /// Size targets that can not be reached. This can be used to disable gc.
138    pub fn max_value() -> Self {
139        Self {
140            count: u64::max_value(),
141            size: u64::max_value(),
142        }
143    }
144}
145
146#[derive(Debug, Clone, Copy)]
147pub enum Synchronous {
148    // this is the most conservative mode. This only works if we have few, large transactions
149    Full,
150    Normal,
151    Off,
152}
153
154impl fmt::Display for Synchronous {
155    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156        f.write_str(match self {
157            Synchronous::Full => "FULL",
158            Synchronous::Normal => "NORMAL",
159            Synchronous::Off => "OFF",
160        })
161    }
162}
163
164#[derive(Debug, Clone)]
165pub struct Config {
166    size_targets: SizeTargets,
167    cache_tracker: Arc<dyn CacheTracker>,
168    pragma_synchronous: Synchronous,
169    pragma_cache_pages: u64,
170    // open in readonly mode
171    read_only: bool,
172    // create if it does not yet exist
173    create: bool,
174}
175
176impl Default for Config {
177    fn default() -> Self {
178        Self {
179            size_targets: Default::default(),
180            cache_tracker: Arc::new(NoopCacheTracker),
181            pragma_synchronous: Synchronous::Full, // most conservative setting
182            pragma_cache_pages: 8192, // 32 megabytes with the default page size of 4096
183            read_only: false,
184            create: true,
185        }
186    }
187}
188
189impl Config {
190    pub fn with_read_only(mut self, value: bool) -> Self {
191        self.read_only = value;
192        self
193    }
194    /// Set size targets for the store
195    pub fn with_size_targets(mut self, count: u64, size: u64) -> Self {
196        self.size_targets = SizeTargets { count, size };
197        self
198    }
199    /// Set strategy for which non-pinned blocks to keep in case one of the size targets is exceeded.
200    pub fn with_cache_tracker<T: CacheTracker + 'static>(mut self, cache_tracker: T) -> Self {
201        self.cache_tracker = Arc::new(cache_tracker);
202        self
203    }
204    pub fn with_pragma_synchronous(mut self, value: Synchronous) -> Self {
205        self.pragma_synchronous = value;
206        self
207    }
208    pub fn with_pragma_cache_pages(mut self, value: u64) -> Self {
209        self.pragma_cache_pages = value;
210        self
211    }
212}
213
214pub struct BlockStore<S> {
215    conn: Connection,
216    expired_temp_pins: Arc<Mutex<Vec<i64>>>,
217    config: Config,
218    db_path: DbPath,
219    recompute_done: Arc<AtomicBool>,
220    _s: PhantomData<S>,
221}
222
223#[derive(Debug, Clone, Default, PartialEq, Eq)]
224pub struct StoreStats {
225    count: u64,
226    size: u64,
227    page_size: u64,
228    used_pages: u64,
229    free_pages: u64,
230}
231
232impl StoreStats {
233    /// Total number of blocks in the store
234    pub fn count(&self) -> u64 {
235        self.count
236    }
237
238    /// Total size of blocks in the store
239    pub fn size(&self) -> u64 {
240        self.size
241    }
242
243    /// Page size used by the SQLite DB
244    pub fn page_size(&self) -> u64 {
245        self.page_size
246    }
247
248    /// Number of used pages in the SQLite DB
249    ///
250    /// Multiply this with [`page_size`](#method.page_size) to obtain an upper bound
251    /// on how much space is actually used. The value returned by [`size`](#method.size)
252    /// will always be smaller than this, since it only counts net block data, without
253    /// overhead. A large difference suggests the need for calling `vacuum`.
254    pub fn used_pages(&self) -> u64 {
255        self.used_pages
256    }
257
258    /// Number of unused pages in the SQLite DB
259    ///
260    /// The DB file can be shrunk by at least this page count by calling `vacuum`, which often is
261    /// a long-running procedure.
262    pub fn free_pages(&self) -> u64 {
263        self.free_pages
264    }
265}
266
267/// a handle that contains a temporary pin
268///
269/// Dropping this handle enqueues the pin for dropping before the next gc.
270// do not implement Clone for this!
271pub struct TempPin {
272    id: i64,
273    expired_temp_pins: Arc<Mutex<Vec<i64>>>,
274}
275
276impl TempPin {
277    fn new(expired_temp_pins: Arc<Mutex<Vec<i64>>>) -> Self {
278        Self {
279            id: 0,
280            expired_temp_pins,
281        }
282    }
283}
284
285/// dump the temp alias id so you can find it in the database
286impl fmt::Debug for TempPin {
287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288        let mut builder = f.debug_struct("TempAlias");
289        if self.id > 0 {
290            builder.field("id", &self.id);
291        } else {
292            builder.field("unused", &true);
293        }
294        builder.finish()
295    }
296}
297
298impl Drop for TempPin {
299    fn drop(&mut self) {
300        if self.id > 0 {
301            self.expired_temp_pins.lock().push(self.id);
302        }
303    }
304}
305
306impl<S> BlockStore<S>
307where
308    S: StoreParams,
309    Ipld: References<S::Codecs>,
310{
311    fn create_connection(db_path: DbPath, config: &Config) -> crate::Result<rusqlite::Connection> {
312        let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX | OpenFlags::SQLITE_OPEN_URI;
313        flags |= if config.read_only {
314            OpenFlags::SQLITE_OPEN_READ_ONLY
315        } else {
316            OpenFlags::SQLITE_OPEN_READ_WRITE
317        };
318        if config.create && !config.read_only {
319            flags |= OpenFlags::SQLITE_OPEN_CREATE
320        }
321        let conn = match db_path {
322            DbPath::Memory => Connection::open_in_memory().ctx("opening in-memory DB")?,
323            DbPath::File(path) => Connection::open_with_flags(path, flags).ctx("opening DB")?,
324        };
325        Ok(conn)
326    }
327
328    pub fn open_path(db_path: DbPath, config: Config) -> crate::Result<Self> {
329        let is_memory = db_path.is_memory();
330        let mut conn = Self::create_connection(db_path.clone(), &config)?;
331        // this needs to be done only once, and before the first transaction
332        conn.execute_batch("PRAGMA journal_mode = WAL")
333            .ctx("setting WAL mode")?;
334        init_db(
335            &mut conn,
336            is_memory,
337            config.pragma_cache_pages as i64,
338            config.pragma_synchronous,
339        )?;
340        let mut this = Self {
341            conn,
342            expired_temp_pins: Arc::new(Mutex::new(Vec::new())),
343            config,
344            db_path,
345            recompute_done: Arc::new(AtomicBool::new(false)),
346            _s: PhantomData,
347        };
348        if !is_memory {
349            let mut conn = this.additional_connection()?;
350            std::thread::spawn(move || {
351                if let Err(e) = recompute_store_stats(&mut conn.conn) {
352                    tracing::error!("cannot recompute store stats: {}", e);
353                }
354                // This is done to avoid GC doing a wal_checkpoint(RESTART) while the above
355                // long-running query is ongoing, since that would block all writers during
356                // that period.
357                conn.recompute_done.store(true, Ordering::SeqCst);
358            });
359        } else {
360            this.recompute_done.store(true, Ordering::SeqCst);
361        }
362        if this.config.cache_tracker.has_persistent_state() {
363            let ids = in_txn(
364                &mut this.conn,
365                Some(("get IDs", Duration::from_secs(1))),
366                false,
367                get_ids,
368            )?;
369            this.config.cache_tracker.retain_ids(&ids);
370        }
371        Ok(this)
372    }
373
374    /// Create another connection to the underlying database
375    ///
376    /// This allows you to perform operations in parallel.
377    pub fn additional_connection(&self) -> crate::Result<Self> {
378        if self.db_path.is_memory() {
379            return Err(BlockStoreError::NoAdditionalInMemory);
380        }
381        let mut conn = Self::create_connection(self.db_path.clone(), &self.config)?;
382        init_pragmas(
383            &mut conn,
384            self.db_path.is_memory(),
385            self.config.pragma_cache_pages as i64,
386        )?;
387        conn.pragma_update(
388            None,
389            "synchronous",
390            &self.config.pragma_synchronous.to_string(),
391        )
392        .ctx("setting synchronous mode")?;
393        Ok(Self {
394            conn,
395            expired_temp_pins: self.expired_temp_pins.clone(),
396            config: self.config.clone(),
397            db_path: self.db_path.clone(),
398            recompute_done: self.recompute_done.clone(),
399            _s: PhantomData,
400        })
401    }
402
403    /// Create an in memory block store with the given config
404    pub fn memory(config: Config) -> crate::Result<Self> {
405        Self::open_path(DbPath::Memory, config)
406    }
407
408    /// Create a persistent block store with the given config
409    pub fn open(path: impl AsRef<Path>, config: Config) -> crate::Result<Self> {
410        let mut pb: PathBuf = PathBuf::new();
411        pb.push(path);
412        Self::open_path(DbPath::File(pb), config)
413    }
414
415    /// Open the file at the given path for testing.
416    ///
417    /// This will create a writeable in-memory database that is initialized with the content
418    /// of the file at the given path.
419    pub fn open_test(path: impl AsRef<Path>, config: Config) -> crate::Result<Self> {
420        let mut conn = Self::create_connection(DbPath::Memory, &config)?;
421        debug!(
422            "Restoring in memory database from {}",
423            path.as_ref().display()
424        );
425        conn.restore(
426            DatabaseName::Main,
427            path,
428            Some(|p: rusqlite::backup::Progress| {
429                let percent = if p.pagecount == 0 {
430                    100
431                } else {
432                    (p.pagecount - p.remaining) * 100 / p.pagecount
433                };
434                if percent % 10 == 0 {
435                    debug!("Restoring: {} %", percent);
436                }
437            }),
438        )
439        .ctx("restoring test DB from backup")?;
440        let ids = in_txn(
441            &mut conn,
442            Some(("get ids", Duration::from_secs(1))),
443            false,
444            get_ids,
445        )?;
446        config.cache_tracker.retain_ids(&ids);
447        Ok(Self {
448            conn,
449            expired_temp_pins: Arc::new(Mutex::new(Vec::new())),
450            config,
451            db_path: DbPath::Memory,
452            recompute_done: Arc::new(AtomicBool::new(true)),
453            _s: PhantomData,
454        })
455    }
456
457    pub fn backup(&mut self, path: PathBuf) -> Result<()> {
458        in_txn(&mut self.conn, None, false, move |txn| {
459            txn.backup(DatabaseName::Main, path.as_path(), None)
460                .ctx("backing up DB")
461        })
462    }
463
464    pub fn flush(&mut self) -> crate::Result<()> {
465        in_txn(&mut self.conn, None, false, |txn| {
466            txn.pragma_update(None, "wal_checkpoint", &"TRUNCATE")
467                .ctx("flushing WAL")
468        })
469    }
470
471    pub fn integrity_check(&mut self) -> crate::Result<()> {
472        let result = integrity_check(&mut self.conn)?;
473        if result == vec!["ok".to_owned()] {
474            Ok(())
475        } else {
476            let error_text = result.join(";");
477            Err(crate::error::BlockStoreError::SqliteError(
478                rusqlite::Error::SqliteFailure(rusqlite::ffi::Error::new(11), Some(error_text)),
479                "checking integrity",
480            ))
481        }
482        // FIXME add actual integrity check on the stored blocks
483    }
484
485    pub fn transaction(&mut self) -> Transaction<'_, S> {
486        Transaction::new(self)
487    }
488
489    /// Get a temporary alias for safely adding blocks to the store
490    pub fn temp_pin(&self) -> TempPin {
491        TempPin::new(self.expired_temp_pins.clone())
492    }
493
494    /// Run a full VACUUM on the SQLITE database
495    ///
496    /// This may take a while, blocking all other writes to the store.
497    pub fn vacuum(&mut self) -> Result<()> {
498        vacuum(&mut self.conn)
499    }
500
501    /// Perform maintenance on the TempPins
502    ///
503    /// This is done automatically upon every (incremental) GC, so you normally don’t need to call this.
504    pub fn cleanup_temp_pins(&mut self) -> Result<()> {
505        // atomically grab the expired_temp_pins until now
506        let expired_temp_pins = mem::take(self.expired_temp_pins.lock().deref_mut());
507        in_txn(
508            &mut self.conn,
509            Some(("dropping expired temp_pins", Duration::from_millis(100))),
510            true,
511            move |txn| {
512                // get rid of dropped temp aliases, this should be fast
513                for id in expired_temp_pins.iter() {
514                    delete_temp_pin(txn, *id)?;
515                }
516                Ok(())
517            },
518        )
519    }
520
521    /// Perform full GC
522    ///
523    /// This is the same as running incremental GC without limits, plus a full SQLITE VACUUM.
524    pub fn gc(&mut self) -> Result<()> {
525        self.cleanup_temp_pins()?;
526        self.flush()?;
527        incremental_gc(
528            &mut self.conn,
529            usize::MAX,
530            Duration::from_secs(u32::MAX.into()),
531            self.config.size_targets,
532            &self.config.cache_tracker,
533        )?;
534        self.vacuum()?;
535        Ok(())
536    }
537
538    fn maybe_checkpoint(&mut self) -> Result<()> {
539        if self.recompute_done.load(Ordering::SeqCst) {
540            self.conn
541                .pragma_update(None, "journal_size_limit", 10_000_000i64)
542                .ctx("setting journal_size_limit")?;
543            self.conn
544                .pragma_update(None, "wal_checkpoint", &"RESTART")
545                .ctx("running wal_checkpoint(RESTART)")?;
546        }
547        Ok(())
548    }
549
550    /// Perform an incremental garbage collection.
551    ///
552    /// Will collect unpinned blocks until either the size targets are met again, or at minimum
553    /// `min_blocks` blocks are collected. Then it will continue collecting blocks until `max_duration`
554    /// is elapsed.
555    ///
556    /// Note that this might significantly exceed `max_duration` for various reasons.
557    ///
558    /// Returns true if either size targets are met or there are no unpinned blocks left.
559    pub fn incremental_gc(&mut self, min_blocks: usize, max_duration: Duration) -> Result<bool> {
560        let stats = self.get_store_stats()?;
561        let _span = tracing::debug_span!("incGC", stats = ?&stats).entered();
562        self.cleanup_temp_pins()?;
563        self.maybe_checkpoint()?;
564        let ret = incremental_gc(
565            &mut self.conn,
566            min_blocks,
567            max_duration,
568            self.config.size_targets,
569            &self.config.cache_tracker,
570        )?;
571        self.maybe_checkpoint()?;
572        in_txn(
573            &mut self.conn,
574            Some(("incremental_vacuum", Duration::from_millis(500))),
575            false,
576            |txn| {
577                txn.execute_batch("PRAGMA incremental_vacuum")
578                    .ctx("incremental vacuum")
579            },
580        )?;
581        Ok(ret)
582    }
583}
584
585macro_rules! delegate {
586    ($($(#[$attr:meta])*$n:ident$(<$v:ident : $vt:path>)?($($arg:ident : $typ:ty),*) -> $ret:ty;)+) => {
587        $(
588            $(#[$attr])*
589            pub fn $n$(<$v: $vt>)?(&mut self, $($arg: $typ),*) -> $ret {
590                let mut txn = self.transaction();
591                let ret = txn.$n($($arg),*)?;
592                txn.commit()?;
593                Ok(ret)
594            }
595        )+
596    };
597}
598
599impl<S> BlockStore<S>
600where
601    S: StoreParams,
602    Ipld: References<S::Codecs>,
603{
604    /// Set or delete an alias
605    pub fn alias<'b>(
606        &mut self,
607        name: impl Into<Cow<'b, [u8]>>,
608        link: Option<&'b Cid>,
609    ) -> Result<()> {
610        self.transaction().alias(name, link)
611    }
612
613    /// Resolves an alias to a cid
614    pub fn resolve<'b>(&mut self, name: impl Into<Cow<'b, [u8]>>) -> Result<Option<Cid>> {
615        self.transaction().resolve(name)
616    }
617
618    delegate! {
619        /// Returns the aliases referencing a cid
620        reverse_alias(cid: &Cid) -> Result<Option<HashSet<Vec<u8>>>>;
621
622        /// Extend temp pin with an additional cid
623        extend_temp_pin(pin: &mut TempPin, link: &Cid) -> Result<()>;
624
625        /// Checks if the store knows about the cid.
626        ///
627        /// Note that this does not necessarily mean that the store has the data for the cid.
628        has_cid(cid: &Cid) -> Result<bool>;
629
630        /// Checks if the store has the data for a cid
631        has_block(cid: &Cid) -> Result<bool>;
632
633        /// Get all cids that the store knows about
634        get_known_cids<C: FromIterator<Cid>>() -> Result<C>;
635
636        /// Get all cids for which the store has blocks
637        get_block_cids<C: FromIterator<Cid>>() -> Result<C>;
638
639        /// Get descendants of a cid
640        get_descendants<C: FromIterator<Cid>>(cid: &Cid) -> Result<C>;
641
642        /// Given a root of a dag, gives all cids which we do not have data for.
643        get_missing_blocks<C: FromIterator<Cid>>(cid: &Cid) -> Result<C>;
644
645        /// list all aliases
646        aliases<C: FromIterator<(Vec<u8>, Cid)>>() -> Result<C>;
647
648        /// Put a block
649        ///
650        /// This will only be completed once the transaction is successfully committed.
651        put_block(block: Block<S>, pin: Option<&mut TempPin>) -> Result<()>;
652
653        /// Get a block
654        get_block(cid: &Cid) -> Result<Option<Vec<u8>>>;
655
656        /// Get the stats for the store
657        ///
658        /// The stats are kept up to date, so this is fast.
659        get_store_stats() -> Result<StoreStats>;
660    }
661
662    pub fn put_blocks<I>(&mut self, blocks: I, mut pin: Option<&mut TempPin>) -> Result<()>
663    where
664        I: IntoIterator<Item = Block<S>>,
665    {
666        let mut txn = self.transaction();
667        for block in blocks {
668            #[allow(clippy::needless_option_as_deref)]
669            txn.put_block(block, pin.as_deref_mut())?;
670        }
671        txn.commit()
672    }
673}