1pub mod cache;
55mod cidbytes;
56mod db;
57mod error;
58#[cfg(test)]
59mod tests;
60mod transaction;
61
62use cache::{CacheTracker, NoopCacheTracker};
63use db::*;
64use error::Context;
65pub use error::{BlockStoreError, Result};
66use libipld::{codec::References, store::StoreParams, Block, Cid, Ipld};
67use parking_lot::Mutex;
68use rusqlite::{Connection, DatabaseName, OpenFlags};
69use std::{
70 borrow::Cow,
71 collections::HashSet,
72 fmt,
73 iter::FromIterator,
74 marker::PhantomData,
75 mem,
76 ops::DerefMut,
77 path::{Path, PathBuf},
78 sync::{
79 atomic::{AtomicBool, Ordering},
80 Arc,
81 },
82 time::Duration,
83};
84use tracing::*;
85pub use transaction::Transaction;
86
87#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
88pub enum DbPath {
89 File(PathBuf),
90 Memory,
91}
92
93impl DbPath {
94 fn is_memory(&self) -> bool {
95 !matches!(self, DbPath::File(_))
96 }
97}
98
99#[derive(Debug, Clone, Copy, Default)]
107pub struct SizeTargets {
108 pub count: u64,
117
118 pub size: u64,
126}
127
128impl SizeTargets {
129 pub fn new(count: u64, size: u64) -> Self {
130 Self { count, size }
131 }
132
133 pub fn exceeded(&self, stats: &StoreStats) -> bool {
134 stats.count > self.count || stats.size > self.size
135 }
136
137 pub fn max_value() -> Self {
139 Self {
140 count: u64::max_value(),
141 size: u64::max_value(),
142 }
143 }
144}
145
146#[derive(Debug, Clone, Copy)]
147pub enum Synchronous {
148 Full,
150 Normal,
151 Off,
152}
153
154impl fmt::Display for Synchronous {
155 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
156 f.write_str(match self {
157 Synchronous::Full => "FULL",
158 Synchronous::Normal => "NORMAL",
159 Synchronous::Off => "OFF",
160 })
161 }
162}
163
164#[derive(Debug, Clone)]
165pub struct Config {
166 size_targets: SizeTargets,
167 cache_tracker: Arc<dyn CacheTracker>,
168 pragma_synchronous: Synchronous,
169 pragma_cache_pages: u64,
170 read_only: bool,
172 create: bool,
174}
175
176impl Default for Config {
177 fn default() -> Self {
178 Self {
179 size_targets: Default::default(),
180 cache_tracker: Arc::new(NoopCacheTracker),
181 pragma_synchronous: Synchronous::Full, pragma_cache_pages: 8192, read_only: false,
184 create: true,
185 }
186 }
187}
188
189impl Config {
190 pub fn with_read_only(mut self, value: bool) -> Self {
191 self.read_only = value;
192 self
193 }
194 pub fn with_size_targets(mut self, count: u64, size: u64) -> Self {
196 self.size_targets = SizeTargets { count, size };
197 self
198 }
199 pub fn with_cache_tracker<T: CacheTracker + 'static>(mut self, cache_tracker: T) -> Self {
201 self.cache_tracker = Arc::new(cache_tracker);
202 self
203 }
204 pub fn with_pragma_synchronous(mut self, value: Synchronous) -> Self {
205 self.pragma_synchronous = value;
206 self
207 }
208 pub fn with_pragma_cache_pages(mut self, value: u64) -> Self {
209 self.pragma_cache_pages = value;
210 self
211 }
212}
213
214pub struct BlockStore<S> {
215 conn: Connection,
216 expired_temp_pins: Arc<Mutex<Vec<i64>>>,
217 config: Config,
218 db_path: DbPath,
219 recompute_done: Arc<AtomicBool>,
220 _s: PhantomData<S>,
221}
222
223#[derive(Debug, Clone, Default, PartialEq, Eq)]
224pub struct StoreStats {
225 count: u64,
226 size: u64,
227 page_size: u64,
228 used_pages: u64,
229 free_pages: u64,
230}
231
232impl StoreStats {
233 pub fn count(&self) -> u64 {
235 self.count
236 }
237
238 pub fn size(&self) -> u64 {
240 self.size
241 }
242
243 pub fn page_size(&self) -> u64 {
245 self.page_size
246 }
247
248 pub fn used_pages(&self) -> u64 {
255 self.used_pages
256 }
257
258 pub fn free_pages(&self) -> u64 {
263 self.free_pages
264 }
265}
266
267pub struct TempPin {
272 id: i64,
273 expired_temp_pins: Arc<Mutex<Vec<i64>>>,
274}
275
276impl TempPin {
277 fn new(expired_temp_pins: Arc<Mutex<Vec<i64>>>) -> Self {
278 Self {
279 id: 0,
280 expired_temp_pins,
281 }
282 }
283}
284
285impl fmt::Debug for TempPin {
287 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288 let mut builder = f.debug_struct("TempAlias");
289 if self.id > 0 {
290 builder.field("id", &self.id);
291 } else {
292 builder.field("unused", &true);
293 }
294 builder.finish()
295 }
296}
297
298impl Drop for TempPin {
299 fn drop(&mut self) {
300 if self.id > 0 {
301 self.expired_temp_pins.lock().push(self.id);
302 }
303 }
304}
305
306impl<S> BlockStore<S>
307where
308 S: StoreParams,
309 Ipld: References<S::Codecs>,
310{
311 fn create_connection(db_path: DbPath, config: &Config) -> crate::Result<rusqlite::Connection> {
312 let mut flags = OpenFlags::SQLITE_OPEN_NO_MUTEX | OpenFlags::SQLITE_OPEN_URI;
313 flags |= if config.read_only {
314 OpenFlags::SQLITE_OPEN_READ_ONLY
315 } else {
316 OpenFlags::SQLITE_OPEN_READ_WRITE
317 };
318 if config.create && !config.read_only {
319 flags |= OpenFlags::SQLITE_OPEN_CREATE
320 }
321 let conn = match db_path {
322 DbPath::Memory => Connection::open_in_memory().ctx("opening in-memory DB")?,
323 DbPath::File(path) => Connection::open_with_flags(path, flags).ctx("opening DB")?,
324 };
325 Ok(conn)
326 }
327
328 pub fn open_path(db_path: DbPath, config: Config) -> crate::Result<Self> {
329 let is_memory = db_path.is_memory();
330 let mut conn = Self::create_connection(db_path.clone(), &config)?;
331 conn.execute_batch("PRAGMA journal_mode = WAL")
333 .ctx("setting WAL mode")?;
334 init_db(
335 &mut conn,
336 is_memory,
337 config.pragma_cache_pages as i64,
338 config.pragma_synchronous,
339 )?;
340 let mut this = Self {
341 conn,
342 expired_temp_pins: Arc::new(Mutex::new(Vec::new())),
343 config,
344 db_path,
345 recompute_done: Arc::new(AtomicBool::new(false)),
346 _s: PhantomData,
347 };
348 if !is_memory {
349 let mut conn = this.additional_connection()?;
350 std::thread::spawn(move || {
351 if let Err(e) = recompute_store_stats(&mut conn.conn) {
352 tracing::error!("cannot recompute store stats: {}", e);
353 }
354 conn.recompute_done.store(true, Ordering::SeqCst);
358 });
359 } else {
360 this.recompute_done.store(true, Ordering::SeqCst);
361 }
362 if this.config.cache_tracker.has_persistent_state() {
363 let ids = in_txn(
364 &mut this.conn,
365 Some(("get IDs", Duration::from_secs(1))),
366 false,
367 get_ids,
368 )?;
369 this.config.cache_tracker.retain_ids(&ids);
370 }
371 Ok(this)
372 }
373
374 pub fn additional_connection(&self) -> crate::Result<Self> {
378 if self.db_path.is_memory() {
379 return Err(BlockStoreError::NoAdditionalInMemory);
380 }
381 let mut conn = Self::create_connection(self.db_path.clone(), &self.config)?;
382 init_pragmas(
383 &mut conn,
384 self.db_path.is_memory(),
385 self.config.pragma_cache_pages as i64,
386 )?;
387 conn.pragma_update(
388 None,
389 "synchronous",
390 &self.config.pragma_synchronous.to_string(),
391 )
392 .ctx("setting synchronous mode")?;
393 Ok(Self {
394 conn,
395 expired_temp_pins: self.expired_temp_pins.clone(),
396 config: self.config.clone(),
397 db_path: self.db_path.clone(),
398 recompute_done: self.recompute_done.clone(),
399 _s: PhantomData,
400 })
401 }
402
403 pub fn memory(config: Config) -> crate::Result<Self> {
405 Self::open_path(DbPath::Memory, config)
406 }
407
408 pub fn open(path: impl AsRef<Path>, config: Config) -> crate::Result<Self> {
410 let mut pb: PathBuf = PathBuf::new();
411 pb.push(path);
412 Self::open_path(DbPath::File(pb), config)
413 }
414
415 pub fn open_test(path: impl AsRef<Path>, config: Config) -> crate::Result<Self> {
420 let mut conn = Self::create_connection(DbPath::Memory, &config)?;
421 debug!(
422 "Restoring in memory database from {}",
423 path.as_ref().display()
424 );
425 conn.restore(
426 DatabaseName::Main,
427 path,
428 Some(|p: rusqlite::backup::Progress| {
429 let percent = if p.pagecount == 0 {
430 100
431 } else {
432 (p.pagecount - p.remaining) * 100 / p.pagecount
433 };
434 if percent % 10 == 0 {
435 debug!("Restoring: {} %", percent);
436 }
437 }),
438 )
439 .ctx("restoring test DB from backup")?;
440 let ids = in_txn(
441 &mut conn,
442 Some(("get ids", Duration::from_secs(1))),
443 false,
444 get_ids,
445 )?;
446 config.cache_tracker.retain_ids(&ids);
447 Ok(Self {
448 conn,
449 expired_temp_pins: Arc::new(Mutex::new(Vec::new())),
450 config,
451 db_path: DbPath::Memory,
452 recompute_done: Arc::new(AtomicBool::new(true)),
453 _s: PhantomData,
454 })
455 }
456
457 pub fn backup(&mut self, path: PathBuf) -> Result<()> {
458 in_txn(&mut self.conn, None, false, move |txn| {
459 txn.backup(DatabaseName::Main, path.as_path(), None)
460 .ctx("backing up DB")
461 })
462 }
463
464 pub fn flush(&mut self) -> crate::Result<()> {
465 in_txn(&mut self.conn, None, false, |txn| {
466 txn.pragma_update(None, "wal_checkpoint", &"TRUNCATE")
467 .ctx("flushing WAL")
468 })
469 }
470
471 pub fn integrity_check(&mut self) -> crate::Result<()> {
472 let result = integrity_check(&mut self.conn)?;
473 if result == vec!["ok".to_owned()] {
474 Ok(())
475 } else {
476 let error_text = result.join(";");
477 Err(crate::error::BlockStoreError::SqliteError(
478 rusqlite::Error::SqliteFailure(rusqlite::ffi::Error::new(11), Some(error_text)),
479 "checking integrity",
480 ))
481 }
482 }
484
485 pub fn transaction(&mut self) -> Transaction<'_, S> {
486 Transaction::new(self)
487 }
488
489 pub fn temp_pin(&self) -> TempPin {
491 TempPin::new(self.expired_temp_pins.clone())
492 }
493
494 pub fn vacuum(&mut self) -> Result<()> {
498 vacuum(&mut self.conn)
499 }
500
501 pub fn cleanup_temp_pins(&mut self) -> Result<()> {
505 let expired_temp_pins = mem::take(self.expired_temp_pins.lock().deref_mut());
507 in_txn(
508 &mut self.conn,
509 Some(("dropping expired temp_pins", Duration::from_millis(100))),
510 true,
511 move |txn| {
512 for id in expired_temp_pins.iter() {
514 delete_temp_pin(txn, *id)?;
515 }
516 Ok(())
517 },
518 )
519 }
520
521 pub fn gc(&mut self) -> Result<()> {
525 self.cleanup_temp_pins()?;
526 self.flush()?;
527 incremental_gc(
528 &mut self.conn,
529 usize::MAX,
530 Duration::from_secs(u32::MAX.into()),
531 self.config.size_targets,
532 &self.config.cache_tracker,
533 )?;
534 self.vacuum()?;
535 Ok(())
536 }
537
538 fn maybe_checkpoint(&mut self) -> Result<()> {
539 if self.recompute_done.load(Ordering::SeqCst) {
540 self.conn
541 .pragma_update(None, "journal_size_limit", 10_000_000i64)
542 .ctx("setting journal_size_limit")?;
543 self.conn
544 .pragma_update(None, "wal_checkpoint", &"RESTART")
545 .ctx("running wal_checkpoint(RESTART)")?;
546 }
547 Ok(())
548 }
549
550 pub fn incremental_gc(&mut self, min_blocks: usize, max_duration: Duration) -> Result<bool> {
560 let stats = self.get_store_stats()?;
561 let _span = tracing::debug_span!("incGC", stats = ?&stats).entered();
562 self.cleanup_temp_pins()?;
563 self.maybe_checkpoint()?;
564 let ret = incremental_gc(
565 &mut self.conn,
566 min_blocks,
567 max_duration,
568 self.config.size_targets,
569 &self.config.cache_tracker,
570 )?;
571 self.maybe_checkpoint()?;
572 in_txn(
573 &mut self.conn,
574 Some(("incremental_vacuum", Duration::from_millis(500))),
575 false,
576 |txn| {
577 txn.execute_batch("PRAGMA incremental_vacuum")
578 .ctx("incremental vacuum")
579 },
580 )?;
581 Ok(ret)
582 }
583}
584
585macro_rules! delegate {
586 ($($(#[$attr:meta])*$n:ident$(<$v:ident : $vt:path>)?($($arg:ident : $typ:ty),*) -> $ret:ty;)+) => {
587 $(
588 $(#[$attr])*
589 pub fn $n$(<$v: $vt>)?(&mut self, $($arg: $typ),*) -> $ret {
590 let mut txn = self.transaction();
591 let ret = txn.$n($($arg),*)?;
592 txn.commit()?;
593 Ok(ret)
594 }
595 )+
596 };
597}
598
599impl<S> BlockStore<S>
600where
601 S: StoreParams,
602 Ipld: References<S::Codecs>,
603{
604 pub fn alias<'b>(
606 &mut self,
607 name: impl Into<Cow<'b, [u8]>>,
608 link: Option<&'b Cid>,
609 ) -> Result<()> {
610 self.transaction().alias(name, link)
611 }
612
613 pub fn resolve<'b>(&mut self, name: impl Into<Cow<'b, [u8]>>) -> Result<Option<Cid>> {
615 self.transaction().resolve(name)
616 }
617
618 delegate! {
619 reverse_alias(cid: &Cid) -> Result<Option<HashSet<Vec<u8>>>>;
621
622 extend_temp_pin(pin: &mut TempPin, link: &Cid) -> Result<()>;
624
625 has_cid(cid: &Cid) -> Result<bool>;
629
630 has_block(cid: &Cid) -> Result<bool>;
632
633 get_known_cids<C: FromIterator<Cid>>() -> Result<C>;
635
636 get_block_cids<C: FromIterator<Cid>>() -> Result<C>;
638
639 get_descendants<C: FromIterator<Cid>>(cid: &Cid) -> Result<C>;
641
642 get_missing_blocks<C: FromIterator<Cid>>(cid: &Cid) -> Result<C>;
644
645 aliases<C: FromIterator<(Vec<u8>, Cid)>>() -> Result<C>;
647
648 put_block(block: Block<S>, pin: Option<&mut TempPin>) -> Result<()>;
652
653 get_block(cid: &Cid) -> Result<Option<Vec<u8>>>;
655
656 get_store_stats() -> Result<StoreStats>;
660 }
661
662 pub fn put_blocks<I>(&mut self, blocks: I, mut pin: Option<&mut TempPin>) -> Result<()>
663 where
664 I: IntoIterator<Item = Block<S>>,
665 {
666 let mut txn = self.transaction();
667 for block in blocks {
668 #[allow(clippy::needless_option_as_deref)]
669 txn.put_block(block, pin.as_deref_mut())?;
670 }
671 txn.commit()
672 }
673}