repo_stream/
disk.rs

1/*!
2Disk storage for blocks on disk
3
4Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed
5to be the best behaved in terms of both on-disk space usage and memory usage.
6
7```no_run
8# use repo_stream::{DiskBuilder, DiskError};
9# #[tokio::main]
10# async fn main() -> Result<(), DiskError> {
11let store = DiskBuilder::new()
12    .with_cache_size_mb(32)
13    .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted
14    .open("/some/path.db".into()).await?;
15# Ok(())
16# }
17```
18*/
19
20use crate::drive::DriveError;
21use rusqlite::OptionalExtension;
22use std::path::PathBuf;
23
24#[derive(Debug, thiserror::Error)]
25pub enum DiskError {
26    /// A wrapped database error
27    ///
28    /// (The wrapped err should probably be obscured to remove public-facing
29    /// sqlite bits)
30    #[error(transparent)]
31    DbError(#[from] rusqlite::Error),
32    /// A tokio blocking task failed to join
33    #[error("Failed to join a tokio blocking task: {0}")]
34    JoinError(#[from] tokio::task::JoinError),
35    /// The total size of stored blocks exceeded the allowed size
36    ///
37    /// If you need to process *really* big CARs, you can configure a higher
38    /// limit.
39    #[error("Maximum disk size reached")]
40    MaxSizeExceeded,
41    #[error("this error was replaced, seeing this is a bug.")]
42    #[doc(hidden)]
43    Stolen,
44}
45
46impl DiskError {
47    /// hack for ownership challenges with the disk driver
48    pub(crate) fn steal(&mut self) -> Self {
49        let mut swapped = DiskError::Stolen;
50        std::mem::swap(self, &mut swapped);
51        swapped
52    }
53}
54
55/// Builder-style disk store setup
56#[derive(Debug, Clone)]
57pub struct DiskBuilder {
58    /// Database in-memory cache allowance
59    ///
60    /// Default: 32 MiB
61    pub cache_size_mb: usize,
62    /// Database stored block size limit
63    ///
64    /// Default: 10 GiB
65    ///
66    /// Note: actual size on disk may be more, but should approximately scale
67    /// with this limit
68    pub max_stored_mb: usize,
69}
70
71impl Default for DiskBuilder {
72    fn default() -> Self {
73        Self {
74            cache_size_mb: 32,
75            max_stored_mb: 10 * 1024, // 10 GiB
76        }
77    }
78}
79
80impl DiskBuilder {
81    /// Begin configuring the storage with defaults
82    pub fn new() -> Self {
83        Default::default()
84    }
85    /// Set the in-memory cache allowance for the database
86    ///
87    /// Default: 32 MiB
88    pub fn with_cache_size_mb(mut self, size: usize) -> Self {
89        self.cache_size_mb = size;
90        self
91    }
92    /// Set the approximate stored block size limit
93    ///
94    /// Default: 10 GiB
95    pub fn with_max_stored_mb(mut self, max: usize) -> Self {
96        self.max_stored_mb = max;
97        self
98    }
99    /// Open and initialize the actual disk storage
100    pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> {
101        DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
102    }
103}
104
105/// On-disk block storage
106pub struct DiskStore {
107    conn: rusqlite::Connection,
108    max_stored: usize,
109    stored: usize,
110}
111
112impl DiskStore {
113    /// Initialize a new disk store
114    pub async fn new(
115        path: PathBuf,
116        cache_mb: usize,
117        max_stored_mb: usize,
118    ) -> Result<Self, DiskError> {
119        let max_stored = max_stored_mb * 2_usize.pow(20);
120        let conn = tokio::task::spawn_blocking(move || {
121            let conn = rusqlite::Connection::open(path)?;
122
123            let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size
124
125            // conn.pragma_update(None, "journal_mode", "OFF")?;
126            // conn.pragma_update(None, "journal_mode", "MEMORY")?;
127            conn.pragma_update(None, "journal_mode", "WAL")?;
128            // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk
129            conn.pragma_update(None, "synchronous", "OFF")?;
130            conn.pragma_update(
131                None,
132                "cache_size",
133                (cache_mb as i64 * sqlite_one_mb).to_string(),
134            )?;
135            Self::reset_tables(&conn)?;
136
137            Ok::<_, DiskError>(conn)
138        })
139        .await??;
140
141        Ok(Self {
142            conn,
143            max_stored,
144            stored: 0,
145        })
146    }
147    pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> {
148        let tx = self.conn.transaction()?;
149        Ok(SqliteWriter {
150            tx,
151            stored: &mut self.stored,
152            max: self.max_stored,
153        })
154    }
155    pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> {
156        let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?;
157        Ok(SqliteReader { select_stmt })
158    }
159    /// Drop and recreate the kv table
160    pub async fn reset(self) -> Result<Self, DiskError> {
161        tokio::task::spawn_blocking(move || {
162            Self::reset_tables(&self.conn)?;
163            Ok(self)
164        })
165        .await?
166    }
167    fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> {
168        conn.execute("DROP TABLE IF EXISTS blocks", ())?;
169        conn.execute(
170            "CREATE TABLE blocks (
171                key  BLOB PRIMARY KEY NOT NULL,
172                val  BLOB NOT NULL
173            ) WITHOUT ROWID",
174            (),
175        )?;
176        Ok(())
177    }
178}
179
180pub(crate) struct SqliteWriter<'conn> {
181    tx: rusqlite::Transaction<'conn>,
182    stored: &'conn mut usize,
183    max: usize,
184}
185
186impl SqliteWriter<'_> {
187    pub(crate) fn put_many(
188        &mut self,
189        kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
190    ) -> Result<(), DriveError> {
191        let mut insert_stmt = self
192            .tx
193            .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")
194            .map_err(DiskError::DbError)?;
195        for pair in kv {
196            let (k, v) = pair?;
197            *self.stored += v.len();
198            if *self.stored > self.max {
199                return Err(DiskError::MaxSizeExceeded.into());
200            }
201            insert_stmt.execute((k, v)).map_err(DiskError::DbError)?;
202        }
203        Ok(())
204    }
205    pub fn commit(self) -> Result<(), DiskError> {
206        self.tx.commit()?;
207        Ok(())
208    }
209}
210
211pub(crate) struct SqliteReader<'conn> {
212    select_stmt: rusqlite::Statement<'conn>,
213}
214
215impl SqliteReader<'_> {
216    pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> {
217        self.select_stmt
218            .query_one((&key,), |row| row.get(0))
219            .optional()
220    }
221}