1use rusqlite::{TransactionBehavior, TransactionState};
2
3use super::*;
4
5#[derive(Default, Debug)]
6#[repr(C)]
7pub struct Limits {
8 pub max_value_length_sum: Option<u64>,
9 pub disable_hole_punching: bool,
11}
12
13type DeletedValuesSender = sync::mpsc::SyncSender<Vec<NonzeroValueLocation>>;
14
15#[derive(Debug)]
18pub struct Handle {
19 pub(crate) conn: Mutex<Connection>,
20 pub(crate) exclusive_files: Mutex<HashMap<FileId, ExclusiveFile>>,
21 pub(crate) dir: Dir,
22 pub(crate) clones: Mutex<FileCloneCache>,
23 pub(crate) instance_limits: Limits,
24 deleted_values: Option<DeletedValuesSender>,
25 _value_puncher: Option<thread::JoinHandle<()>>,
26 value_puncher_done: ValuePuncherDone,
27}
28
29type ManifestUserVersion = u32;
31
32impl Handle {
33 pub fn dir_supports_file_cloning(&self) -> bool {
35 self.dir.supports_file_cloning()
36 }
37
38 pub fn set_instance_limits(&mut self, limits: Limits) -> Result<()> {
39 self.instance_limits = limits;
40 self.start_deferred_transaction()?.apply_limits()
41 }
42
43 pub fn dir(&self) -> &Path {
44 self.dir.as_ref()
45 }
46
47 pub(crate) fn get_exclusive_file(&self) -> Result<ExclusiveFile> {
48 {
49 let mut files = self.exclusive_files.lock().unwrap();
50 if let Some(id) = files.keys().next().cloned() {
53 let file = files.remove(&id).unwrap();
54 debug_assert_eq!(id, file.id);
55 debug!("using exclusive file {} from handle", &file.id);
56 return Ok(file);
57 }
58 }
59 trace!("about to open existing files");
60 if let Some(file) = self.open_existing_exclusive_file()? {
61 debug!("opened existing values file {}", file.id);
62 return Ok(file);
63 }
64 trace!("here");
65 let ret = ExclusiveFile::new(&self.dir);
66 if let Ok(file) = &ret {
67 debug!("created new exclusive file {}", file.id);
68 }
69 ret
70 }
71
72 fn open_existing_exclusive_file(&self) -> Result<Option<ExclusiveFile>> {
73 for res in read_dir(&self.dir)? {
74 let entry = res?;
75 if !entry.file_type()?.is_file() {
76 continue;
77 }
78 if !ExclusiveFile::valid_file_name(entry.file_name().to_str().unwrap()) {
79 continue;
80 }
81 let path = entry.path();
82 debug!(?path, "opening existing file");
83 match ExclusiveFile::open(path.clone()) {
84 Ok(ef) => return Ok(ef),
85 Err(err) => {
86 debug!(?path, ?err, "open");
87 }
88 }
89 }
90 Ok(None)
91 }
92
93 const USER_VERSION: u32 = 3;
95
96 pub fn new(dir: PathBuf) -> Result<Self> {
97 let sqlite_version = rusqlite::version_number();
98 if sqlite_version < 3042000 {
100 bail!(
101 "sqlite version {} below minimum {}",
102 rusqlite::version(),
103 "3.42"
104 );
105 }
106 let dir = Dir::new(dir).context("new Dir")?;
107 let mut conn = Connection::open(dir.path().join(MANIFEST_DB_FILE_NAME))?;
108 Self::init_sqlite_conn(&mut conn, &dir)?;
109 let (deleted_values, receiver) = sync::mpsc::sync_channel(10);
110 let (value_puncher_done_sender, value_puncher_done) = sync::mpsc::sync_channel(0);
111 let value_puncher_done = ValuePuncherDone(Arc::new(Mutex::new(value_puncher_done)));
112 let handle = Self {
113 conn: Mutex::new(conn),
114 exclusive_files: Default::default(),
115 dir: dir.clone(),
116 clones: Default::default(),
117 instance_limits: Default::default(),
118 deleted_values: Some(deleted_values),
119 _value_puncher: Some(thread::spawn(move || -> () {
122 let _value_puncher_done_sender = value_puncher_done_sender;
123 if let Err(err) = Self::value_puncher(dir, receiver) {
124 error!("value puncher thread failed with {err:?}");
125 }
126 })),
127 value_puncher_done,
128 };
129 Ok(handle)
130 }
131
132 fn retry_while_busy<T>(mut f: impl FnMut() -> rusqlite::Result<T>) -> rusqlite::Result<T> {
133 loop {
134 match f() {
135 Err(rusqlite::Error::SqliteFailure(err, _))
136 if err.code == rusqlite::ErrorCode::DatabaseBusy =>
137 {
138 std::thread::sleep(Duration::from_secs(1));
139 }
140 default => return default,
141 }
142 }
143 }
144
145 fn init_sqlite_conn(conn: &mut Connection, dir: &Dir) -> anyhow::Result<()> {
146 Self::retry_while_busy(|| conn.pragma_update(None, "synchronous", "off"))?;
147
148 let get_user_version = |conn: &Connection| -> Result<ManifestUserVersion, _> {
149 conn.pragma_query_value(None, "user_version", |row| row.get(0))
150 };
151 let user_version: ManifestUserVersion = get_user_version(conn)?;
152 if user_version >= Self::USER_VERSION {
154 return Ok(());
155 }
156 conn.pragma_update(None, "journal_mode", "delete")?;
164 conn.pragma_update(None, "locking_mode", "exclusive")?;
167 let user_version = get_user_version(conn)?;
168 if user_version < Self::USER_VERSION {
169 use rusqlite::config::DbConfig::SQLITE_DBCONFIG_RESET_DATABASE;
170 conn.set_db_config(SQLITE_DBCONFIG_RESET_DATABASE, true)?;
171 conn.execute("vacuum", [])?;
173 conn.set_db_config(SQLITE_DBCONFIG_RESET_DATABASE, false)?;
174 Self::delete_all_values_files(dir)?;
175 init_manifest_schema(conn)?;
177 conn.pragma_update(None, "user_version", Self::USER_VERSION)?;
178 }
179 let mode: String =
180 conn.pragma_update_and_check(None, "locking_mode", "normal", |row| row.get(0))?;
181 assert_eq!(mode, "normal");
182 conn.pragma_update(None, "journal_mode", "wal")?;
186 Ok(())
187 }
188
189 fn delete_all_values_files(dir: &Dir) -> anyhow::Result<()> {
191 for entry in dir.walk_dir()? {
192 let path = &entry.path;
193 if !matches!(entry.entry_type, EntryType::ValuesFile) {
194 continue;
195 }
196 let file = OpenOptions::new().write(true).open(path)?;
197 if !file.lock_max_segment(LockExclusiveNonblock)? {
198 warn!(?path, "file for deletion is locked. blocking");
199 assert!(file.lock_max_segment(LockExclusive)?);
200 }
201 debug!(?path, "deleting file");
202 remove_file(path)?;
203 }
204 Ok(())
205 }
206
207 pub fn cleanup_snapshots(&self) -> PubResult<()> {
208 delete_unused_snapshots(self.dir.path()).map_err(Into::into)
209 }
210
211 pub fn block_size(&self) -> u64 {
212 self.dir.block_size()
213 }
214
215 pub fn new_writer(&self) -> Result<BatchWriter<&Handle>> {
216 Ok(BatchWriter::new(self))
217 }
218
219 pub(crate) fn start_immediate_transaction(&self) -> rusqlite::Result<OwnedTx> {
220 self.start_writable_transaction_with_behaviour(TransactionBehavior::Immediate)
221 }
222
223 pub(crate) fn start_writable_transaction_with_behaviour(
224 &self,
225 behaviour: TransactionBehavior,
226 ) -> rusqlite::Result<OwnedTx> {
227 Ok(self
228 .start_transaction(|conn, handle| {
229 let tx_res = run_blocking(|| {
230 conn.transaction_with_behavior(behaviour).map(CanSend)
236 });
237 let rtx = tx_res?.0;
238 Ok(Transaction::new(rtx, handle))
239 })?
240 .into())
241 }
242
243 pub fn start_deferred_transaction_for_read(&self) -> rusqlite::Result<OwnedReadTx> {
246 Ok(self
247 .start_transaction(|conn, _handle| {
248 let rtx = conn.transaction_with_behavior(TransactionBehavior::Deferred)?;
249 Ok(ReadTransactionOwned(rtx))
250 })?
251 .into())
252 }
253
254 pub(crate) fn start_deferred_transaction(&self) -> rusqlite::Result<OwnedTx> {
259 self.start_writable_transaction_with_behaviour(TransactionBehavior::Deferred)
260 }
261
262 pub fn read(&self) -> rusqlite::Result<Reader<OwnedTx>> {
264 let reader = Reader {
265 owned_tx: self
266 .start_writable_transaction_with_behaviour(TransactionBehavior::Immediate)?,
267 reads: Default::default(),
268 };
269 Ok(reader)
270 }
271
272 pub fn read_single(&self, key: &[u8]) -> Result<Option<SnapshotValue<Value>>> {
273 let mut reader = self.read()?;
274 let Some(value) = reader.add(key)? else {
275 return Ok(None);
276 };
277 let snapshot = reader.begin()?;
278 Ok(Some(snapshot.value(value)))
279 }
280
281 pub fn single_write_from(
282 &self,
283 key: Vec<u8>,
284 r: impl Read,
285 ) -> Result<(u64, WriteCommitResult)> {
286 let mut writer = self.new_writer()?;
287 let mut value = writer.new_value().begin()?;
288 trace!("got value writer");
289 let n = value.copy_from(r)?;
290 writer.stage_write(key, value)?;
291 let commit = writer.commit()?;
292 Ok((n, commit))
293 }
294
295 pub fn single_delete(&self, key: &[u8]) -> PubResult<Option<c_api::PossumStat>> {
296 let mut tx = self.start_deferred_transaction()?;
297 let deleted = tx.delete_key(key)?;
298 if deleted.is_some() {
301 tx.commit()?.complete();
302 }
303 Ok(deleted)
304 }
305
306 pub fn clone_from_file(&mut self, key: Vec<u8>, file: &mut File) -> Result<u64> {
307 let mut writer = self.new_writer()?;
308 let mut value = writer.new_value().clone_file(file)?;
309 let n = value.value_length()?;
310 writer.stage_write(key, value)?;
311 writer.commit()?;
312 Ok(n)
313 }
314
315 pub fn rename_item(&mut self, from: &[u8], to: &[u8]) -> PubResult<Timestamp> {
316 let mut tx = self.start_immediate_transaction()?;
317 let last_used = tx.rename_item(from, to)?;
318 tx.commit()?.complete();
319 Ok(last_used)
320 }
321
322 pub fn walk_dir(&self) -> Result<Vec<walk::Entry>> {
324 crate::walk::walk_dir(&self.dir)
325 }
326
327 pub fn list_items(&self, prefix: &[u8]) -> PubResult<Vec<Item>> {
328 self.start_deferred_transaction_for_read()?
329 .list_items(prefix)
330 }
331
332 fn value_puncher(
334 dir: Dir,
335 values_receiver: sync::mpsc::Receiver<Vec<NonzeroValueLocation>>,
336 ) -> Result<()> {
337 let manifest_path = dir.path().join(MANIFEST_DB_FILE_NAME);
338 use rusqlite::OpenFlags;
339 let mut conn = Connection::open_with_flags(
340 manifest_path,
341 OpenFlags::SQLITE_OPEN_READ_ONLY
342 | OpenFlags::SQLITE_OPEN_NO_MUTEX
343 | OpenFlags::SQLITE_OPEN_URI,
344 )?;
345 const RETRY_DURATION: Duration = Duration::from_secs(1);
346 let mut pending_values: Vec<_> = Default::default();
347 let mut values_receiver_opt = Some(values_receiver);
348 while values_receiver_opt.is_some() || !pending_values.is_empty() {
349 match &values_receiver_opt {
350 Some(values_receiver) => {
351 let timeout = if pending_values.is_empty() {
352 Duration::MAX
353 } else {
354 RETRY_DURATION
355 };
356 let recv_result = values_receiver.recv_timeout(timeout);
357 use std::sync::mpsc::RecvTimeoutError;
358 match recv_result {
359 Ok(mut values) => {
360 pending_values.append(&mut values);
361 while let Ok(more_values) = values_receiver.try_recv() {
363 pending_values.extend(more_values);
364 }
365 }
366 Err(RecvTimeoutError::Timeout) => {}
367 Err(RecvTimeoutError::Disconnected) => {
368 values_receiver_opt = None;
370 }
371 }
372 }
373 None => {
374 std::thread::sleep(RETRY_DURATION);
375 }
376 }
377 let tx = conn.transaction_with_behavior(TransactionBehavior::Deferred)?;
378 let tx = ReadTransactionOwned(tx);
379 pending_values = Self::punch_values(&dir, pending_values, &tx)?;
380 debug_assert_ne!(tx.0.transaction_state(None)?, TransactionState::Write);
381 }
382 Ok(())
383 }
384
385 pub(crate) fn punch_values(
388 dir: &Dir,
389 values: Vec<NonzeroValueLocation>,
390 transaction: &ReadTransactionOwned,
391 ) -> PubResult<Vec<NonzeroValueLocation>> {
392 let mut failed = Vec::with_capacity(values.len());
393 for v in values {
394 let NonzeroValueLocation {
395 file_id,
396 file_offset,
397 length,
398 } = &v;
399 let value_length = length;
400 let msg = format!(
401 "deleting value at {:?} {} {}",
402 file_id, file_offset, value_length
403 );
404 debug!("{}", msg);
405 if !punch_value(PunchValueOptions {
407 dir: dir.path(),
408 file_id,
409 offset: *file_offset,
410 length: *value_length,
411 tx: transaction,
412 block_size: dir.block_size(),
413 constraints: Default::default(),
414 })
415 .context(msg)?
416 {
417 failed.push(v);
418 }
419 }
420 Ok(failed)
421 }
422
423 pub(crate) fn send_values_for_delete(&self, values: Vec<NonzeroValueLocation>) {
424 use std::sync::mpsc::TrySendError::*;
425 let sender = self.deleted_values.as_ref().unwrap();
426 match sender.try_send(values) {
427 Ok(()) => (),
428 Err(Disconnected(values)) => {
429 error!("sending {values:?}: channel disconnected");
430 }
431 Err(Full(values)) => {
432 warn!("channel full while sending values. blocking.");
433 sender.send(values).unwrap()
434 }
435 }
436 }
437
438 pub fn get_value_puncher_done(&self) -> ValuePuncherDone {
440 ValuePuncherDone(Arc::clone(&self.value_puncher_done.0))
441 }
442
443 pub fn move_prefix(&self, from: &[u8], to: &[u8]) -> Result<()> {
444 let mut tx = self.start_deferred_transaction()?;
445 let items = tx.list_items(from)?;
446 let mut to_vec = to.to_vec();
447 for item in items {
448 to_vec.truncate(to.len());
449 to_vec.extend_from_slice(item.key.strip_prefix(from).unwrap());
450 tx.rename_item(&item.key, &to_vec)?;
451 }
452 tx.commit()?.complete();
453 Ok(())
454 }
455
456 pub fn delete_prefix(&self, prefix: impl AsRef<[u8]>) -> PubResult<()> {
457 let mut tx = self.start_deferred_transaction()?;
458 for item in tx.list_items(prefix.as_ref())? {
459 tx.delete_key(&item.key)?;
460 }
461 tx.commit()?.complete();
462 Ok(())
463 }
464}
465
466use item::Item;
467
468use crate::c_api::{PossumHandle, PossumHandleRc};
469use crate::dir::Dir;
470use crate::owned_cell::{MutOwnedCell, OwnedCell};
471use crate::ownedtx::{OwnedReadTx, OwnedTxInner};
472use crate::tx::ReadTransaction;
473use crate::walk::EntryType;
474
475impl Drop for Handle {
476 fn drop(&mut self) {
477 }
483}
484
485#[derive(Debug)]
486pub struct ValuePuncherDone(Arc<Mutex<sync::mpsc::Receiver<()>>>);
487
488impl ValuePuncherDone {
489 pub fn wait(&self) {
490 assert!(matches!(
491 self.0.lock().unwrap().recv(),
492 Err(std::sync::mpsc::RecvError)
493 ))
494 }
495}
496
497pub(crate) trait StartTransaction<'h, T> {
499 type Owned;
500 type TxHandle;
501 fn start_transaction(
502 self,
503 make_tx: impl FnOnce(&'h mut Connection, Self::TxHandle) -> rusqlite::Result<T>,
505 ) -> rusqlite::Result<Self::Owned>;
506}
507
508impl<'h, T> StartTransaction<'h, T> for &'h Handle {
509 type Owned = OwnedTxInner<'h, T>;
510 type TxHandle = &'h Handle;
511 fn start_transaction(
512 self,
513 make_tx: impl FnOnce(&'h mut Connection, Self::TxHandle) -> rusqlite::Result<T>,
514 ) -> rusqlite::Result<Self::Owned> {
515 let guard = self.conn.lock().unwrap();
516 MutOwnedCell::try_make(guard, |conn| make_tx(conn, self))
517 }
518}
519
520impl<'h, T> StartTransaction<'h, T> for PossumHandleRc {
521 type Owned = OwnedCell<
522 Self,
523 OwnedCell<Rc<RwLockReadGuard<'h, Handle>>, MutOwnedCell<MutexGuard<'h, Connection>, T>>,
524 >;
525 type TxHandle = Rc<RwLockReadGuard<'h, Handle>>;
526 fn start_transaction(
528 self,
529 make_tx: impl FnOnce(&'h mut Connection, Self::TxHandle) -> rusqlite::Result<T>,
530 ) -> rusqlite::Result<Self::Owned> {
531 OwnedCell::try_make(self, |handle_lock| {
532 let handle_guard = Rc::new(handle_lock.read().unwrap());
533 OwnedCell::try_make(handle_guard.clone(), |handle| {
534 MutOwnedCell::try_make(handle.conn.lock().unwrap(), |conn| {
535 make_tx(conn, handle_guard)
536 })
537 })
538 })
539 }
540}
541
542pub trait WithHandle {
543 fn with_handle<R>(&self, f: impl FnOnce(&Handle) -> R) -> R;
544}
545
546impl WithHandle for &Handle {
547 fn with_handle<R>(&self, f: impl FnOnce(&Handle) -> R) -> R {
548 f(self)
549 }
550}
551
552impl WithHandle for PossumHandle {
553 fn with_handle<R>(&self, f: impl FnOnce(&Handle) -> R) -> R {
554 f(&self.read().unwrap())
555 }
556}
557
558impl AsRef<Handle> for Handle {
559 fn as_ref(&self) -> &Handle {
560 self
561 }
562}
563
564impl AsRef<Handle> for Rc<RwLockReadGuard<'_, Handle>> {
565 fn as_ref(&self) -> &Handle {
566 self.deref()
567 }
568}
569
570struct CanSend<T>(T);
571
572unsafe impl<T> Send for CanSend<T> {}