1use crate::drive::DriveError;
21use rusqlite::OptionalExtension;
22use std::path::PathBuf;
23
24#[derive(Debug, thiserror::Error)]
25pub enum DiskError {
26 #[error(transparent)]
31 DbError(#[from] rusqlite::Error),
32 #[error("Failed to join a tokio blocking task: {0}")]
34 JoinError(#[from] tokio::task::JoinError),
35 #[error("Maximum disk size reached")]
40 MaxSizeExceeded,
41 #[error("this error was replaced, seeing this is a bug.")]
42 #[doc(hidden)]
43 Stolen,
44}
45
46impl DiskError {
47 pub(crate) fn steal(&mut self) -> Self {
49 let mut swapped = DiskError::Stolen;
50 std::mem::swap(self, &mut swapped);
51 swapped
52 }
53}
54
55pub struct DiskBuilder {
57 pub cache_size_mb: usize,
61 pub max_stored_mb: usize,
68}
69
70impl Default for DiskBuilder {
71 fn default() -> Self {
72 Self {
73 cache_size_mb: 32,
74 max_stored_mb: 10 * 1024, }
76 }
77}
78
79impl DiskBuilder {
80 pub fn new() -> Self {
82 Default::default()
83 }
84 pub fn with_cache_size_mb(mut self, size: usize) -> Self {
88 self.cache_size_mb = size;
89 self
90 }
91 pub fn with_max_stored_mb(mut self, max: usize) -> Self {
95 self.max_stored_mb = max;
96 self
97 }
98 pub async fn open(self, path: PathBuf) -> Result<DiskStore, DiskError> {
100 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
101 }
102}
103
104pub struct DiskStore {
106 conn: rusqlite::Connection,
107 max_stored: usize,
108 stored: usize,
109}
110
111impl DiskStore {
112 pub async fn new(
114 path: PathBuf,
115 cache_mb: usize,
116 max_stored_mb: usize,
117 ) -> Result<Self, DiskError> {
118 let max_stored = max_stored_mb * 2_usize.pow(20);
119 let conn = tokio::task::spawn_blocking(move || {
120 let conn = rusqlite::Connection::open(path)?;
121
122 let sqlite_one_mb = -(2_i64.pow(10)); conn.pragma_update(None, "journal_mode", "WAL")?;
127 conn.pragma_update(None, "synchronous", "OFF")?;
129 conn.pragma_update(
130 None,
131 "cache_size",
132 (cache_mb as i64 * sqlite_one_mb).to_string(),
133 )?;
134 Self::reset_tables(&conn)?;
135
136 Ok::<_, DiskError>(conn)
137 })
138 .await??;
139
140 Ok(Self {
141 conn,
142 max_stored,
143 stored: 0,
144 })
145 }
146 pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> {
147 let tx = self.conn.transaction()?;
148 Ok(SqliteWriter {
149 tx,
150 stored: &mut self.stored,
151 max: self.max_stored,
152 })
153 }
154 pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> {
155 let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?;
156 Ok(SqliteReader { select_stmt })
157 }
158 pub async fn reset(self) -> Result<Self, DiskError> {
160 tokio::task::spawn_blocking(move || {
161 Self::reset_tables(&self.conn)?;
162 Ok(self)
163 })
164 .await?
165 }
166 fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> {
167 conn.execute("DROP TABLE IF EXISTS blocks", ())?;
168 conn.execute(
169 "CREATE TABLE blocks (
170 key BLOB PRIMARY KEY NOT NULL,
171 val BLOB NOT NULL
172 ) WITHOUT ROWID",
173 (),
174 )?;
175 Ok(())
176 }
177}
178
179pub(crate) struct SqliteWriter<'conn> {
180 tx: rusqlite::Transaction<'conn>,
181 stored: &'conn mut usize,
182 max: usize,
183}
184
185impl SqliteWriter<'_> {
186 pub(crate) fn put_many(
187 &mut self,
188 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
189 ) -> Result<(), DriveError> {
190 let mut insert_stmt = self
191 .tx
192 .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")
193 .map_err(DiskError::DbError)?;
194 for pair in kv {
195 let (k, v) = pair?;
196 *self.stored += v.len();
197 if *self.stored > self.max {
198 return Err(DiskError::MaxSizeExceeded.into());
199 }
200 insert_stmt.execute((k, v)).map_err(DiskError::DbError)?;
201 }
202 Ok(())
203 }
204 pub fn commit(self) -> Result<(), DiskError> {
205 self.tx.commit()?;
206 Ok(())
207 }
208}
209
210pub(crate) struct SqliteReader<'conn> {
211 select_stmt: rusqlite::Statement<'conn>,
212}
213
214impl SqliteReader<'_> {
215 pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> {
216 self.select_stmt
217 .query_one((&key,), |row| row.get(0))
218 .optional()
219 }
220}