repo_stream/
disk.rs

1/*!
2Disk storage for blocks on disk
3
4Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed
5to be the best behaved in terms of both on-disk space usage and memory usage.
6
7```no_run
8# use repo_stream::{DiskBuilder, DiskError};
9# #[tokio::main]
10# async fn main() -> Result<(), DiskError> {
11let store = DiskBuilder::new()
12    .with_cache_size_mb(32)
13    .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted
14    .open("/some/path.db".into()).await?;
15# Ok(())
16# }
17```
18*/
19
20use crate::drive::DriveError;
21use rusqlite::OptionalExtension;
22use std::path::PathBuf;
23
24#[derive(Debug, thiserror::Error)]
25pub enum DiskError {
26    /// A wrapped database error
27    ///
28    /// (The wrapped err should probably be obscured to remove public-facing
29    /// sqlite bits)
30    #[error(transparent)]
31    DbError(#[from] rusqlite::Error),
32    /// A tokio blocking task failed to join
33    #[error("Failed to join a tokio blocking task: {0}")]
34    JoinError(#[from] tokio::task::JoinError),
35    /// The total size of stored blocks exceeded the allowed size
36    ///
37    /// If you need to process *really* big CARs, you can configure a higher
38    /// limit.
39    #[error("Maximum disk size reached")]
40    MaxSizeExceeded,
41    #[error("this error was replaced, seeing this is a bug.")]
42    #[doc(hidden)]
43    Stolen,
44}
45
46impl DiskError {
47    /// hack for ownership challenges with the disk driver
48    pub(crate) fn steal(&mut self) -> Self {
49        let mut swapped = DiskError::Stolen;
50        std::mem::swap(self, &mut swapped);
51        swapped
52    }
53}
54
55/// Builder-style disk store setup
56pub struct DiskBuilder {
57    /// Database in-memory cache allowance
58    ///
59    /// Default: 32 MiB
60    pub cache_size_mb: usize,
61    /// Database stored block size limit
62    ///
63    /// Default: 10 GiB
64    ///
65    /// Note: actual size on disk may be more, but should approximately scale
66    /// with this limit
67    pub max_stored_mb: usize,
68}
69
70impl Default for DiskBuilder {
71    fn default() -> Self {
72        Self {
73            cache_size_mb: 32,
74            max_stored_mb: 10 * 1024, // 10 GiB
75        }
76    }
77}
78
79impl DiskBuilder {
80    /// Begin configuring the storage with defaults
81    pub fn new() -> Self {
82        Default::default()
83    }
84    /// Set the in-memory cache allowance for the database
85    ///
86    /// Default: 32 MiB
87    pub fn with_cache_size_mb(mut self, size: usize) -> Self {
88        self.cache_size_mb = size;
89        self
90    }
91    /// Set the approximate stored block size limit
92    ///
93    /// Default: 10 GiB
94    pub fn with_max_stored_mb(mut self, max: usize) -> Self {
95        self.max_stored_mb = max;
96        self
97    }
98    /// Open and initialize the actual disk storage
99    pub async fn open(self, path: PathBuf) -> Result<DiskStore, DiskError> {
100        DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
101    }
102}
103
104/// On-disk block storage
105pub struct DiskStore {
106    conn: rusqlite::Connection,
107    max_stored: usize,
108    stored: usize,
109}
110
111impl DiskStore {
112    /// Initialize a new disk store
113    pub async fn new(
114        path: PathBuf,
115        cache_mb: usize,
116        max_stored_mb: usize,
117    ) -> Result<Self, DiskError> {
118        let max_stored = max_stored_mb * 2_usize.pow(20);
119        let conn = tokio::task::spawn_blocking(move || {
120            let conn = rusqlite::Connection::open(path)?;
121
122            let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size
123
124            // conn.pragma_update(None, "journal_mode", "OFF")?;
125            // conn.pragma_update(None, "journal_mode", "MEMORY")?;
126            conn.pragma_update(None, "journal_mode", "WAL")?;
127            // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk
128            conn.pragma_update(None, "synchronous", "OFF")?;
129            conn.pragma_update(
130                None,
131                "cache_size",
132                (cache_mb as i64 * sqlite_one_mb).to_string(),
133            )?;
134            Self::reset_tables(&conn)?;
135
136            Ok::<_, DiskError>(conn)
137        })
138        .await??;
139
140        Ok(Self {
141            conn,
142            max_stored,
143            stored: 0,
144        })
145    }
146    pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> {
147        let tx = self.conn.transaction()?;
148        Ok(SqliteWriter {
149            tx,
150            stored: &mut self.stored,
151            max: self.max_stored,
152        })
153    }
154    pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> {
155        let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?;
156        Ok(SqliteReader { select_stmt })
157    }
158    /// Drop and recreate the kv table
159    pub async fn reset(self) -> Result<Self, DiskError> {
160        tokio::task::spawn_blocking(move || {
161            Self::reset_tables(&self.conn)?;
162            Ok(self)
163        })
164        .await?
165    }
166    fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> {
167        conn.execute("DROP TABLE IF EXISTS blocks", ())?;
168        conn.execute(
169            "CREATE TABLE blocks (
170                key  BLOB PRIMARY KEY NOT NULL,
171                val  BLOB NOT NULL
172            ) WITHOUT ROWID",
173            (),
174        )?;
175        Ok(())
176    }
177}
178
179pub(crate) struct SqliteWriter<'conn> {
180    tx: rusqlite::Transaction<'conn>,
181    stored: &'conn mut usize,
182    max: usize,
183}
184
185impl SqliteWriter<'_> {
186    pub(crate) fn put_many(
187        &mut self,
188        kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
189    ) -> Result<(), DriveError> {
190        let mut insert_stmt = self
191            .tx
192            .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")
193            .map_err(DiskError::DbError)?;
194        for pair in kv {
195            let (k, v) = pair?;
196            *self.stored += v.len();
197            if *self.stored > self.max {
198                return Err(DiskError::MaxSizeExceeded.into());
199            }
200            insert_stmt.execute((k, v)).map_err(DiskError::DbError)?;
201        }
202        Ok(())
203    }
204    pub fn commit(self) -> Result<(), DiskError> {
205        self.tx.commit()?;
206        Ok(())
207    }
208}
209
210pub(crate) struct SqliteReader<'conn> {
211    select_stmt: rusqlite::Statement<'conn>,
212}
213
214impl SqliteReader<'_> {
215    pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> {
216        self.select_stmt
217            .query_one((&key,), |row| row.get(0))
218            .optional()
219    }
220}