1use super::*;
2
3#[must_use]
5pub(crate) struct PostCommitWork<H> {
6 handle: H,
7 deleted_values: Vec<NonzeroValueLocation>,
8 altered_files: HashSet<FileId>,
9}
10
11pub trait ReadOnlyTransactionAccessor {
13 fn readonly_transaction(&self) -> &rusqlite::Transaction;
14}
15
16trait ReadOnlyRusqliteTransaction {
18 fn prepare_cached_readonly(&self, sql: &str) -> rusqlite::Result<CachedStatement>;
19}
20
21impl ReadOnlyRusqliteTransaction for rusqlite::Transaction<'_> {
23 fn prepare_cached_readonly(&self, sql: &str) -> rusqlite::Result<CachedStatement> {
24 prepare_cached_readonly(self.borrow(), sql)
25 }
26}
27
28fn prepare_cached_readonly<'a>(
29 conn: &'a Connection,
30 sql: &str,
31) -> rusqlite::Result<CachedStatement<'a>> {
32 let stmt = conn.prepare_cached(sql)?;
33 assert!(stmt.readonly());
34 Ok(stmt)
35}
36
37pub type ReadTransactionRef<'a> = &'a ReadTransactionOwned<'a>;
38
39pub struct ReadTransactionOwned<'a>(pub(crate) rusqlite::Transaction<'a>);
41
42impl ReadOnlyTransactionAccessor for ReadTransactionOwned<'_> {
43 fn readonly_transaction(&self) -> &rusqlite::Transaction {
44 &self.0
45 }
46}
47
48pub trait ReadTransaction: ReadOnlyTransactionAccessor {
51 fn file_values(&self, file_id: FileId) -> rusqlite::Result<FileValues<CachedStatement>> {
52 let stmt = self
53 .readonly_transaction()
54 .prepare_cached_readonly(&format!(
55 "select {} from keys where file_id=? order by file_offset",
56 value_columns_sql()
57 ))?;
58 let iter = FileValues { stmt, file_id };
59 Ok(iter)
60 }
61
62 fn sum_value_length(&self) -> rusqlite::Result<u64> {
63 self.readonly_transaction()
64 .prepare_cached_readonly("select value from sums where key='value_length'")?
65 .query_row([], |row| row.get(0))
66 }
67
68 fn query_last_end_offset(&self, file_id: &FileId, offset: u64) -> rusqlite::Result<u64> {
70 self.readonly_transaction()
71 .prepare_cached_readonly(
72 "select max(file_offset+value_length) as last_offset \
73 from keys \
74 where file_id=? and file_offset+value_length <= ?",
75 )?
76 .query_row(params![file_id, offset], |row| {
77 let res: rusqlite::Result<Option<_>> = row.get(0);
80 res.map(|v| v.unwrap_or_default())
81 })
82 }
83
84 fn next_value_offset(
86 &self,
87 file_id: &FileId,
88 min_offset: u64,
89 ) -> rusqlite::Result<Option<u64>> {
90 self.readonly_transaction()
91 .prepare_cached_readonly(
92 "select min(file_offset) \
93 from keys \
94 where file_id=? and file_offset >= ?",
95 )?
96 .query_row(params![file_id, min_offset], |row| row.get(0))
97 }
98
99 fn list_items(&self, prefix: &[u8]) -> PubResult<Vec<Item>> {
101 let range_end = {
102 let mut prefix = prefix.to_owned();
103 if inc_big_endian_array(&mut prefix) {
104 Some(prefix)
105 } else {
106 None
107 }
108 };
109 match range_end {
110 None => list_items_inner(
111 self.readonly_transaction(),
112 &format!(
113 "select {}, key from keys where key >= ?",
114 value_columns_sql()
115 ),
116 [prefix],
117 ),
118 Some(range_end) => list_items_inner(
119 self.readonly_transaction(),
120 &format!(
121 "select {}, key from keys where key >= ? and key < ?",
122 value_columns_sql()
123 ),
124 rusqlite::params![prefix, range_end],
125 ),
126 }
127 }
128}
129
130fn list_items_inner(
131 tx: &rusqlite::Transaction,
132 sql: &str,
133 params: impl rusqlite::Params,
134) -> PubResult<Vec<Item>> {
135 tx.prepare_cached_readonly(sql)
136 .unwrap()
137 .query_map(params, |row| {
138 Ok(Item {
139 value: Value::from_row(row)?,
140 key: row.get(VALUE_COLUMN_NAMES.len())?,
141 })
142 })?
143 .collect::<rusqlite::Result<Vec<_>>>()
144 .map_err(Into::into)
145}
146
147impl<H> PostCommitWork<H>
148where
149 H: AsRef<Handle>,
150{
151 pub fn complete(self) {
152 if !self.handle.as_ref().instance_limits.disable_hole_punching {
155 self.handle
156 .as_ref()
157 .send_values_for_delete(self.deleted_values);
158 }
159 for file_id in self.altered_files {
161 self.handle.as_ref().clones.lock().unwrap().remove(&file_id);
162 }
163 }
164}
165
166pub struct Transaction<'h, H> {
169 tx: rusqlite::Transaction<'h>,
170 handle: H,
171 deleted_values: Vec<NonzeroValueLocation>,
172 altered_files: HashSet<FileId>,
173}
174
175impl<H> ReadOnlyTransactionAccessor for Transaction<'_, H> {
178 fn readonly_transaction(&self) -> &rusqlite::Transaction {
179 &self.tx
180 }
181}
182
183impl<T> ReadTransaction for T where T: ReadOnlyTransactionAccessor {}
184
185impl<H> Transaction<'_, H> {
186 pub fn touch_for_read(&mut self, key: &[u8]) -> rusqlite::Result<Value> {
187 let (file_id, file_offset, value_length, mut last_used, now) = self
191 .tx
192 .prepare_cached_readonly(&format!(
193 "select {}, cast(unixepoch('subsec')*1e3 as integer) \
194 from keys where key=?",
195 value_columns_sql()
196 ))?
197 .query_row([key], |row| row.try_into())?;
198 let update_last_used = last_used != now;
199 if update_last_used {
201 let (new_last_used,) = self
202 .tx
203 .prepare_cached(
204 r"
205 update keys
206 set last_used=cast(unixepoch('subsec')*1e3 as integer)
207 where key=?
208 returning last_used
209 ",
210 )?
211 .query_row([key], |row| row.try_into())?;
212 last_used = new_last_used;
216 }
217 Value::from_column_values(file_id, file_offset, value_length, last_used)
218 }
219}
220
221impl<'h, H> Transaction<'h, H>
222where
223 H: AsRef<Handle>,
224{
225 pub fn handle(&self) -> &Handle {
226 self.handle.as_ref()
227 }
228
229 pub(crate) fn commit(mut self) -> Result<PostCommitWork<H>> {
230 self.apply_limits()?;
231 self.tx.commit()?;
232 Ok(PostCommitWork {
233 handle: self.handle,
234 deleted_values: self.deleted_values,
235 altered_files: self.altered_files,
236 })
237 }
238
239 pub fn apply_limits(&mut self) -> Result<()> {
240 if self.tx.transaction_state(None)? != rusqlite::TransactionState::Write {
241 return Ok(());
242 }
243 if let Some(max) = self.handle.as_ref().instance_limits.max_value_length_sum {
244 loop {
245 let actual = self
246 .sum_value_length()
247 .context("reading value_length sum")?;
248 if actual <= max {
249 break;
250 }
251 self.evict_values(actual - max)?;
252 }
253 }
254 Ok(())
255 }
256 pub fn new(tx: rusqlite::Transaction<'h>, handle: H) -> Self {
257 Self {
258 tx,
259 handle,
260 deleted_values: vec![],
261 altered_files: Default::default(),
262 }
263 }
264
265 pub fn rename_value(&mut self, value: &Value, new_key: Vec<u8>) -> PubResult<bool> {
267 match self
268 .tx
269 .prepare_cached(&format!(
270 "delete from keys where key=? returning {}",
271 value_columns_sql()
272 ))?
273 .query_row(params![&new_key], Value::from_row)
274 {
275 Err(QueryReturnedNoRows) => {}
276 Err(err) => return Err(err.into()),
277 Ok(existing_value) => {
278 match existing_value.location {
279 Nonzero(a) => {
280 let b = value;
281 if Some(a.file_offset) == b.file_offset() && Some(&a.file_id) == b.file_id()
282 {
283 assert_eq!(a.length, b.length());
284 return Ok(true);
286 }
287 self.deleted_values.push(a);
289 }
290 ZeroLength => {}
291 }
292 }
293 };
294
295 let res: rusqlite::Result<ValueLength> = self
296 .tx
297 .prepare_cached(
298 "update keys set key=? where file_id=? and file_offset=?\
299 returning value_length",
300 )?
301 .query_row(
302 params![new_key, value.file_id(), value.file_offset()],
303 |row| row.get(0),
304 );
305 match res {
306 Err(QueryReturnedNoRows) => Ok(false),
307 Err(err) => Err(err).context("updating value key").map_err(Into::into),
308 Ok(value_length) => {
309 assert_eq!(value_length, value.length());
310 Ok(true)
311 }
312 }
313 }
314
315 pub fn rename_item(&mut self, from: &[u8], to: &[u8]) -> PubResult<Timestamp> {
318 let row_result = self.tx.query_row(
319 "update keys set key=? where key=? returning last_used",
320 [to, from],
321 |row| {
322 let ts: Timestamp = row.get(0)?;
323 Ok(ts)
324 },
325 );
326 let last_used = match row_result {
327 Err(QueryReturnedNoRows) => Err(Error::NoSuchKey),
328 Ok(ok) => Ok(ok),
329 Err(err) => Err(err.into()),
330 }?;
331 assert_eq!(self.tx.changes(), 1);
332 Ok(last_used)
333 }
334
335 pub(crate) fn insert_key(&mut self, pw: PendingWrite) -> rusqlite::Result<()> {
336 let mut file_id = Some(pw.value_file_id);
337 let mut file_offset = Some(pw.value_file_offset);
338 if pw.value_length == 0 {
339 file_id = None;
340 file_offset = None;
341 }
342 let inserted = self
343 .tx
344 .prepare_cached(
345 "insert into keys (key, file_id, file_offset, value_length)\
346 values (?, ?, ?, ?)",
347 )?
348 .execute(rusqlite::params!(
349 pw.key,
350 file_id,
351 file_offset,
352 pw.value_length
353 ))?;
354 assert_eq!(inserted, 1);
355 if pw.value_length != 0 {
356 self.altered_files.insert(pw.value_file_id);
357 }
358 Ok(())
359 }
360
361 fn push_value_for_deletion(&mut self, value: Value) {
362 match value.location {
363 Nonzero(location) => self.deleted_values.push(location),
364 ZeroLength => {}
365 }
366 }
367
368 pub fn delete_key(&mut self, key: &[u8]) -> rusqlite::Result<Option<c_api::PossumStat>> {
369 let res = self
370 .tx
371 .prepare_cached(&format!(
372 "delete from keys where key=? returning {}",
373 value_columns_sql()
374 ))?
375 .query_row([key], Value::from_row);
376 match res {
377 Err(QueryReturnedNoRows) => Ok(None),
378 Ok(value) => {
379 let stat = value.as_ref().into();
380 self.push_value_for_deletion(value);
381 Ok(Some(stat))
382 }
383 Err(err) => Err(err),
384 }
385 }
386
387 pub fn evict_values(&mut self, target_bytes: u64) -> Result<()> {
388 let mut stmt = self.tx.prepare_cached(&format!(
389 "delete from keys where key_id in (\
390 select key_id from keys order by last_used limit 1\
391 )\
392 returning {}",
393 value_columns_sql()
394 ))?;
395 let mut value_bytes_deleted = 0;
396 let mut values_deleted = vec![];
397 while value_bytes_deleted < target_bytes {
398 let value = stmt.query_row([], Value::from_row)?;
399 value_bytes_deleted += value.length();
400 info!("evicting {:?}", &value);
401 values_deleted.push(value);
402 }
403 drop(stmt);
404 for value in values_deleted {
405 self.push_value_for_deletion(value);
406 }
407 Ok(())
408 }
409}