possum/
tx.rs

1use super::*;
2
3/// This is more work to be done after the Handle conn mutex is released.
4#[must_use]
5pub(crate) struct PostCommitWork<H> {
6    handle: H,
7    deleted_values: Vec<NonzeroValueLocation>,
8    altered_files: HashSet<FileId>,
9}
10
11/// Exposes a rusqlite Transaction to implement ReadTransaction.
12pub trait ReadOnlyTransactionAccessor {
13    fn readonly_transaction(&self) -> &rusqlite::Transaction;
14}
15
16/// Extends rusqlite objects with stuff needed for ReadTransaction.
17trait ReadOnlyRusqliteTransaction {
18    fn prepare_cached_readonly(&self, sql: &str) -> rusqlite::Result<CachedStatement>;
19}
20
21// This could just as easily be implemented for rusqlite::Connection too.
22impl ReadOnlyRusqliteTransaction for rusqlite::Transaction<'_> {
23    fn prepare_cached_readonly(&self, sql: &str) -> rusqlite::Result<CachedStatement> {
24        prepare_cached_readonly(self.borrow(), sql)
25    }
26}
27
28fn prepare_cached_readonly<'a>(
29    conn: &'a Connection,
30    sql: &str,
31) -> rusqlite::Result<CachedStatement<'a>> {
32    let stmt = conn.prepare_cached(sql)?;
33    assert!(stmt.readonly());
34    Ok(stmt)
35}
36
37pub type ReadTransactionRef<'a> = &'a ReadTransactionOwned<'a>;
38
39/// Helper type for wrapping rusqlite::Transaction to only provide ReadTransaction capabilities.
40pub struct ReadTransactionOwned<'a>(pub(crate) rusqlite::Transaction<'a>);
41
42impl ReadOnlyTransactionAccessor for ReadTransactionOwned<'_> {
43    fn readonly_transaction(&self) -> &rusqlite::Transaction {
44        &self.0
45    }
46}
47
48/// Extra methods for types exposing a rusqlite Transaction that's allowed to do read transaction
49/// stuff.
50pub trait ReadTransaction: ReadOnlyTransactionAccessor {
51    fn file_values(&self, file_id: FileId) -> rusqlite::Result<FileValues<CachedStatement>> {
52        let stmt = self
53            .readonly_transaction()
54            .prepare_cached_readonly(&format!(
55                "select {} from keys where file_id=? order by file_offset",
56                value_columns_sql()
57            ))?;
58        let iter = FileValues { stmt, file_id };
59        Ok(iter)
60    }
61
62    fn sum_value_length(&self) -> rusqlite::Result<u64> {
63        self.readonly_transaction()
64            .prepare_cached_readonly("select value from sums where key='value_length'")?
65            .query_row([], |row| row.get(0))
66    }
67
68    /// Returns the end offset of the last active value before offset in the same file.
69    fn query_last_end_offset(&self, file_id: &FileId, offset: u64) -> rusqlite::Result<u64> {
70        self.readonly_transaction()
71            .prepare_cached_readonly(
72                "select max(file_offset+value_length) as last_offset \
73                from keys \
74                where file_id=? and file_offset+value_length <= ?",
75            )?
76            .query_row(params![file_id, offset], |row| {
77                // I don't know why, but this can return null for file_ids that have values but
78                // don't fit the other conditions.
79                let res: rusqlite::Result<Option<_>> = row.get(0);
80                res.map(|v| v.unwrap_or_default())
81            })
82    }
83
84    /// Returns the next value offset with at least min_offset.
85    fn next_value_offset(
86        &self,
87        file_id: &FileId,
88        min_offset: u64,
89    ) -> rusqlite::Result<Option<u64>> {
90        self.readonly_transaction()
91            .prepare_cached_readonly(
92                "select min(file_offset) \
93                from keys \
94                where file_id=? and file_offset >= ?",
95            )?
96            .query_row(params![file_id, min_offset], |row| row.get(0))
97    }
98
99    // TODO: Make this iterate.
100    fn list_items(&self, prefix: &[u8]) -> PubResult<Vec<Item>> {
101        let range_end = {
102            let mut prefix = prefix.to_owned();
103            if inc_big_endian_array(&mut prefix) {
104                Some(prefix)
105            } else {
106                None
107            }
108        };
109        match range_end {
110            None => list_items_inner(
111                self.readonly_transaction(),
112                &format!(
113                    "select {}, key from keys where key >= ?",
114                    value_columns_sql()
115                ),
116                [prefix],
117            ),
118            Some(range_end) => list_items_inner(
119                self.readonly_transaction(),
120                &format!(
121                    "select {}, key from keys where key >= ? and key < ?",
122                    value_columns_sql()
123                ),
124                rusqlite::params![prefix, range_end],
125            ),
126        }
127    }
128}
129
130fn list_items_inner(
131    tx: &rusqlite::Transaction,
132    sql: &str,
133    params: impl rusqlite::Params,
134) -> PubResult<Vec<Item>> {
135    tx.prepare_cached_readonly(sql)
136        .unwrap()
137        .query_map(params, |row| {
138            Ok(Item {
139                value: Value::from_row(row)?,
140                key: row.get(VALUE_COLUMN_NAMES.len())?,
141            })
142        })?
143        .collect::<rusqlite::Result<Vec<_>>>()
144        .map_err(Into::into)
145}
146
147impl<H> PostCommitWork<H>
148where
149    H: AsRef<Handle>,
150{
151    pub fn complete(self) {
152        // This has to happen after exclusive files are flushed or there's a tendency for hole
153        // punches to not persist. It doesn't fix the problem, but it significantly reduces it.
154        if !self.handle.as_ref().instance_limits.disable_hole_punching {
155            self.handle
156                .as_ref()
157                .send_values_for_delete(self.deleted_values);
158        }
159        // Forget any references to clones of files that have changed.
160        for file_id in self.altered_files {
161            self.handle.as_ref().clones.lock().unwrap().remove(&file_id);
162        }
163    }
164}
165
166// I can't work out how to have a reference to the Connection, and a transaction on it here at the
167// same time. TODO: Make this private.
168pub struct Transaction<'h, H> {
169    tx: rusqlite::Transaction<'h>,
170    handle: H,
171    deleted_values: Vec<NonzeroValueLocation>,
172    altered_files: HashSet<FileId>,
173}
174
175// TODO: Try doing this with a read trait that just requires a rusqlite::Transaction be available.
176
177impl<H> ReadOnlyTransactionAccessor for Transaction<'_, H> {
178    fn readonly_transaction(&self) -> &rusqlite::Transaction {
179        &self.tx
180    }
181}
182
183impl<T> ReadTransaction for T where T: ReadOnlyTransactionAccessor {}
184
185impl<H> Transaction<'_, H> {
186    pub fn touch_for_read(&mut self, key: &[u8]) -> rusqlite::Result<Value> {
187        // Avoid modifying the manifest. We had to take a write lock already to ensure our data
188        // isn't modified on us, but it still seems to be an improvement. (-67% on read times in
189        // fact).
190        let (file_id, file_offset, value_length, mut last_used, now) = self
191            .tx
192            .prepare_cached_readonly(&format!(
193                "select {}, cast(unixepoch('subsec')*1e3 as integer) \
194                from keys where key=?",
195                value_columns_sql()
196            ))?
197            .query_row([key], |row| row.try_into())?;
198        let update_last_used = last_used != now;
199        // eprintln!("updating last used: {}", update_last_used);
200        if update_last_used {
201            let (new_last_used,) = self
202                .tx
203                .prepare_cached(
204                    r"
205                    update keys
206                    set last_used=cast(unixepoch('subsec')*1e3 as integer)
207                    where key=?
208                    returning last_used
209                    ",
210                )?
211                .query_row([key], |row| row.try_into())?;
212            // This can in fact change between calls. Since we're updating now anyway, we don't
213            // really care.
214            //assert_eq!(new_last_used, now);
215            last_used = new_last_used;
216        }
217        Value::from_column_values(file_id, file_offset, value_length, last_used)
218    }
219}
220
221impl<'h, H> Transaction<'h, H>
222where
223    H: AsRef<Handle>,
224{
225    pub fn handle(&self) -> &Handle {
226        self.handle.as_ref()
227    }
228
229    pub(crate) fn commit(mut self) -> Result<PostCommitWork<H>> {
230        self.apply_limits()?;
231        self.tx.commit()?;
232        Ok(PostCommitWork {
233            handle: self.handle,
234            deleted_values: self.deleted_values,
235            altered_files: self.altered_files,
236        })
237    }
238
239    pub fn apply_limits(&mut self) -> Result<()> {
240        if self.tx.transaction_state(None)? != rusqlite::TransactionState::Write {
241            return Ok(());
242        }
243        if let Some(max) = self.handle.as_ref().instance_limits.max_value_length_sum {
244            loop {
245                let actual = self
246                    .sum_value_length()
247                    .context("reading value_length sum")?;
248                if actual <= max {
249                    break;
250                }
251                self.evict_values(actual - max)?;
252            }
253        }
254        Ok(())
255    }
256    pub fn new(tx: rusqlite::Transaction<'h>, handle: H) -> Self {
257        Self {
258            tx,
259            handle,
260            deleted_values: vec![],
261            altered_files: Default::default(),
262        }
263    }
264
265    // TODO: Add a test for renaming onto itself.
266    pub fn rename_value(&mut self, value: &Value, new_key: Vec<u8>) -> PubResult<bool> {
267        match self
268            .tx
269            .prepare_cached(&format!(
270                "delete from keys where key=? returning {}",
271                value_columns_sql()
272            ))?
273            .query_row(params![&new_key], Value::from_row)
274        {
275            Err(QueryReturnedNoRows) => {}
276            Err(err) => return Err(err.into()),
277            Ok(existing_value) => {
278                match existing_value.location {
279                    Nonzero(a) => {
280                        let b = value;
281                        if Some(a.file_offset) == b.file_offset() && Some(&a.file_id) == b.file_id()
282                        {
283                            assert_eq!(a.length, b.length());
284                            // Renamed but the name is the same.
285                            return Ok(true);
286                        }
287                        // Schedule the value that previously had the key to be hole punched.
288                        self.deleted_values.push(a);
289                    }
290                    ZeroLength => {}
291                }
292            }
293        };
294
295        let res: rusqlite::Result<ValueLength> = self
296            .tx
297            .prepare_cached(
298                "update keys set key=? where file_id=? and file_offset=?\
299                returning value_length",
300            )?
301            .query_row(
302                params![new_key, value.file_id(), value.file_offset()],
303                |row| row.get(0),
304            );
305        match res {
306            Err(QueryReturnedNoRows) => Ok(false),
307            Err(err) => Err(err).context("updating value key").map_err(Into::into),
308            Ok(value_length) => {
309                assert_eq!(value_length, value.length());
310                Ok(true)
311            }
312        }
313    }
314
315    // I guess this doesn't handle destination collisions? It should give a unique constraint error
316    // from sqlite.
317    pub fn rename_item(&mut self, from: &[u8], to: &[u8]) -> PubResult<Timestamp> {
318        let row_result = self.tx.query_row(
319            "update keys set key=? where key=? returning last_used",
320            [to, from],
321            |row| {
322                let ts: Timestamp = row.get(0)?;
323                Ok(ts)
324            },
325        );
326        let last_used = match row_result {
327            Err(QueryReturnedNoRows) => Err(Error::NoSuchKey),
328            Ok(ok) => Ok(ok),
329            Err(err) => Err(err.into()),
330        }?;
331        assert_eq!(self.tx.changes(), 1);
332        Ok(last_used)
333    }
334
335    pub(crate) fn insert_key(&mut self, pw: PendingWrite) -> rusqlite::Result<()> {
336        let mut file_id = Some(pw.value_file_id);
337        let mut file_offset = Some(pw.value_file_offset);
338        if pw.value_length == 0 {
339            file_id = None;
340            file_offset = None;
341        }
342        let inserted = self
343            .tx
344            .prepare_cached(
345                "insert into keys (key, file_id, file_offset, value_length)\
346                values (?, ?, ?, ?)",
347            )?
348            .execute(rusqlite::params!(
349                pw.key,
350                file_id,
351                file_offset,
352                pw.value_length
353            ))?;
354        assert_eq!(inserted, 1);
355        if pw.value_length != 0 {
356            self.altered_files.insert(pw.value_file_id);
357        }
358        Ok(())
359    }
360
361    fn push_value_for_deletion(&mut self, value: Value) {
362        match value.location {
363            Nonzero(location) => self.deleted_values.push(location),
364            ZeroLength => {}
365        }
366    }
367
368    pub fn delete_key(&mut self, key: &[u8]) -> rusqlite::Result<Option<c_api::PossumStat>> {
369        let res = self
370            .tx
371            .prepare_cached(&format!(
372                "delete from keys where key=? returning {}",
373                value_columns_sql()
374            ))?
375            .query_row([key], Value::from_row);
376        match res {
377            Err(QueryReturnedNoRows) => Ok(None),
378            Ok(value) => {
379                let stat = value.as_ref().into();
380                self.push_value_for_deletion(value);
381                Ok(Some(stat))
382            }
383            Err(err) => Err(err),
384        }
385    }
386
387    pub fn evict_values(&mut self, target_bytes: u64) -> Result<()> {
388        let mut stmt = self.tx.prepare_cached(&format!(
389            "delete from keys where key_id in (\
390                select key_id from keys order by last_used limit 1\
391            )\
392            returning {}",
393            value_columns_sql()
394        ))?;
395        let mut value_bytes_deleted = 0;
396        let mut values_deleted = vec![];
397        while value_bytes_deleted < target_bytes {
398            let value = stmt.query_row([], Value::from_row)?;
399            value_bytes_deleted += value.length();
400            info!("evicting {:?}", &value);
401            values_deleted.push(value);
402        }
403        drop(stmt);
404        for value in values_deleted {
405            self.push_value_for_deletion(value);
406        }
407        Ok(())
408    }
409}