1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
//! # IPFS sqlite block store //! //! A block store for a rust implementation of [ipfs](https://ipfs.io/). //! //! # Concepts //! //! ## Aliases //! //! An alias is a named pin of a root. When a root is aliased, none of the leaves of the dag pointed //! to by the root will be collected by gc. However, a root being aliased does not mean that the dag //! must be complete. //! //! ## Temporary aliases //! //! A temporary alias is an unnamed alias that is just for the purpose of protecting blocks from gc //! while a large tree is being constructed. While an alias maps a single name to a single root, a //! temporary alias can be assigned to an arbitary number of blocks before the dag is finished. //! //! A temporary alias will be deleted as soon as the handle goes out of scope. //! //! ## Garbage Collection (GC) //! //! GC refers to the process of removing unpinned blocks. It runs only when the configured size //! targets are exceeded. [Size targets](SizeTargets) contain both the total size of the store //! and the number of blocks. //! //! GC will run incrementally, deleting blocks until the size targets are no longer exceeded. The //! order in which unpinned blocks will be deleted can be customized. //! //! ## Caching //! //! For unpinned blocks, it is possible to customize which blocks have the highest value using a //! [CacheTracker](cache::CacheTracker). The default is to [do nothing](cache::NoopCacheTracker) //! and has no performance overhead. //! //! The most elaborate implemented strategy is to keep track of access times in a separate database, //! via the [SqliteCacheTracker](cache::SqliteCacheTracker), which has a slight performance overhead. //! //! The performance overhead of writing to an access tracking database on each block read can be //! mitigated by using the [AsyncCacheTracker](cache::AsyncCacheTracker) wrapper to perform the database //! writes on a different thread. //! //! # Usage //! //! ## Blocking //! //! For blocking usage, use [BlockStore](BlockStore). This is the most low level interface. //! //! ## Non-blocking //! //! For non-blocking usage, use [AsyncBlockStore](async_block_store::AsyncBlockStore). This is a //! wrapper that is meant to be used from async rust. In addition to wrapping most methods of //! [BlockStore], it provides a method [gc_loop](async_block_store::AsyncBlockStore::gc_loop) to //! run gc continuously. //! //! # Major differences to the go-ipfs pinning concept //! //! - Pinning/aliasing a root does not require that the dag is complete //! - Aliases/named pins as opposed to unnamed and non-reference-counted pins //! - Temporary pins as a mechanism to keep blocks safe from gc while a tree is being constructed pub mod async_block_store; pub mod cache; mod cidbytes; mod db; mod error; #[cfg(test)] mod tests; use crate::cidbytes::CidBytes; use cache::{BlockInfo, CacheTracker, NoopCacheTracker}; use db::*; pub use error::{BlockStoreError, Result}; use libipld::cid::{self, Cid}; use rusqlite::{Connection, DatabaseName}; use std::{ convert::TryFrom, fmt, iter::FromIterator, ops::DerefMut, path::Path, sync::{ atomic::{AtomicI64, Ordering}, Arc, Mutex, }, time::Duration, }; use tracing::*; /// Size targets for a store. Gc of non-pinned blocks will start once one of the size targets is exceeded. /// /// There are targets for both block count and block size. The reason for this is that a store that has /// a very large number of tiny blocks will become sluggish despite not having a large total size. /// /// Size targets only apply to non-pinned blocks. Pinned blocks will never be gced even if exceeding one of the /// size targets. #[derive(Debug, Clone, Copy, Default)] pub struct SizeTargets { /// target number of blocks. /// /// Up to this number, the store will retain everything even if not pinned. /// Once this number is exceeded, the store will run garbage collection of all /// unpinned blocks until the block criterion is met again. /// /// To completely disable storing of non-pinned blocks, set this to 0. /// Even then, the store will never delete pinned blocks. pub count: u64, /// target store size. /// /// Up to this size, the store will retain everything even if not pinned. /// Once this size is exceeded, the store will run garbage collection of all /// unpinned blocks until the size criterion is met again. /// /// The store will never delete pinned blocks. pub size: u64, } impl SizeTargets { pub fn new(count: u64, size: u64) -> Self { Self { count, size } } pub fn exceeded(&self, stats: &StoreStats) -> bool { stats.count > self.count || stats.size > self.size } /// Size targets that can not be reached. This can be used to disable gc. pub fn max_value() -> Self { Self { count: u64::max_value(), size: u64::max_value(), } } } #[derive(Debug)] pub struct Config { size_targets: SizeTargets, cache_tracker: Box<dyn CacheTracker>, } impl Default for Config { fn default() -> Self { Self { size_targets: Default::default(), cache_tracker: Box::new(NoopCacheTracker), } } } impl Config { /// Set size targets for the store pub fn with_size_targets(mut self, size_targets: SizeTargets) -> Self { self.size_targets = size_targets; self } /// Set strategy for which non-pinned blocks to keep in case one of the size targets is exceeded. pub fn with_cache_tracker<T: CacheTracker + 'static>(mut self, cache_tracker: T) -> Self { self.cache_tracker = Box::new(cache_tracker); self } } pub struct BlockStore { conn: Connection, expired_temp_pins: Arc<Mutex<Vec<i64>>>, config: Config, } #[derive(Debug, Clone, Default, PartialEq, Eq)] pub struct StoreStats { count: u64, size: u64, } impl StoreStats { /// Total number of blocks in the store pub fn count(&self) -> u64 { self.count } /// Total size of blocks in the store pub fn size(&self) -> u64 { self.size } } // do not implement Clone for this! /// a handle that contains a temporary pin /// /// dropping this handle enqueue the pin for dropping before the next gc. pub struct TempPin { id: AtomicI64, expired_temp_pins: Arc<Mutex<Vec<i64>>>, } /// dump the temp alias id so you can find it in the database impl fmt::Debug for TempPin { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let id = self.id.load(Ordering::SeqCst); let mut builder = f.debug_struct("TempAlias"); if id > 0 { builder.field("id", &id); } builder.finish() } } impl Drop for TempPin { fn drop(&mut self) { let id = self.id.get_mut(); let alias = *id; if alias > 0 { // not sure if we have to guard against double drop, but it certainly does not hurt. *id = 0; self.expired_temp_pins.lock().unwrap().push(alias); } } } /// An ipfs block pub trait Block { fn cid(&self) -> &Cid; fn data(&self) -> &[u8]; fn links(&self) -> anyhow::Result<Vec<Cid>>; } /// Block that owns its data pub struct OwnedBlock { cid: Cid, data: Vec<u8>, links: Vec<Cid>, } impl OwnedBlock { pub fn new(cid: Cid, data: Vec<u8>, links: Vec<Cid>) -> Self { Self { cid, data, links } } } impl Block for OwnedBlock { fn cid(&self) -> &Cid { &self.cid } fn data(&self) -> &[u8] { &self.data } fn links(&self) -> anyhow::Result<Vec<Cid>> { Ok(self.links.clone()) } } struct BorrowedBlock<'a, F> { cid: Cid, data: &'a [u8], links: F, } impl<'a, F> BorrowedBlock<'a, F> where F: Fn() -> anyhow::Result<Vec<Cid>>, { fn new(cid: Cid, data: &'a [u8], links: F) -> Self { Self { cid, data, links } } } impl<'a, F> Block for BorrowedBlock<'a, F> where F: Fn() -> anyhow::Result<Vec<Cid>>, { fn cid(&self) -> &Cid { &self.cid } fn data(&self) -> &[u8] { self.data } fn links(&self) -> anyhow::Result<Vec<Cid>> { (self.links)() } } impl BlockStore { /// Create an in memory block store with the given config pub fn memory(config: Config) -> crate::Result<Self> { let mut conn = Connection::open_in_memory()?; init_db(&mut conn, true)?; Ok(Self { conn, expired_temp_pins: Arc::new(Mutex::new(Vec::new())), config, }) } /// Create a persistent block store with the given config pub fn open(path: impl AsRef<Path>, mut config: Config) -> crate::Result<Self> { let mut conn = Connection::open(path)?; init_db(&mut conn, false)?; let ids = in_txn(&mut conn, |txn| get_ids(txn))?; config.cache_tracker.retain_ids(&ids); Ok(Self { conn, expired_temp_pins: Arc::new(Mutex::new(Vec::new())), config, }) } /// Open the file at the given path for testing. /// /// This will create a writeable in-memory database that is initialized with the content /// of the file at the given path. pub fn open_test(path: impl AsRef<Path>, mut config: Config) -> crate::Result<Self> { let mut conn = Connection::open_in_memory()?; debug!( "Restoring in memory database from {}", path.as_ref().display() ); conn.restore( DatabaseName::Main, path, Some(|p: rusqlite::backup::Progress| { let percent = (p.pagecount - p.remaining) * 100 / p.pagecount; if percent % 10 == 0 { debug!("Restoring: {} %", percent); } }), )?; let ids = in_txn(&mut conn, |txn| get_ids(txn))?; config.cache_tracker.retain_ids(&ids); Ok(Self { conn, expired_temp_pins: Arc::new(Mutex::new(Vec::new())), config, }) } pub fn integrity_check(&self) -> crate::Result<()> { let result = integrity_check(&self.conn)?; if result == vec!["ok".to_owned()] { Ok(()) } else { let error_text = result.join(";"); Err(crate::error::BlockStoreError::SqliteError( rusqlite::Error::SqliteFailure(rusqlite::ffi::Error::new(11), Some(error_text)), )) } } /// Get a temporary alias for safely adding blocks to the store pub fn temp_pin(&self) -> TempPin { TempPin { id: AtomicI64::new(0), expired_temp_pins: self.expired_temp_pins.clone(), } } /// Add a permanent named alias/pin for a root pub fn alias(&mut self, name: impl AsRef<[u8]>, link: Option<&Cid>) -> crate::Result<()> { self.alias_many(std::iter::once((name, link.cloned()))) } /// Add multiple permanent named aliases pub fn alias_many( &mut self, aliases: impl IntoIterator<Item = (impl AsRef<[u8]>, Option<Cid>)>, ) -> crate::Result<()> { in_txn(&mut self.conn, |txn| { for (name, link) in aliases.into_iter() { let link: Option<CidBytes> = link.map(|x| CidBytes::try_from(&x)).transpose()?; alias(txn, name.as_ref(), link.as_ref())?; } Ok(()) }) } /// Returns the aliases referencing a block. pub fn reverse_alias(&mut self, cid: &Cid) -> crate::Result<Vec<Vec<u8>>> { let cid = CidBytes::try_from(cid)?; in_txn(&mut self.conn, |txn| reverse_alias(txn, cid.as_ref())) } /// Checks if the store knows about the cid. /// Note that this does not necessarily mean that the store has the data for the cid. pub fn has_cid(&self, cid: &Cid) -> Result<bool> { let cid = CidBytes::try_from(cid)?; in_ro_txn(&self.conn, |txn| has_cid(txn, cid)) } /// Checks if the store has the data for a cid pub fn has_block(&mut self, cid: &Cid) -> Result<bool> { let cid = CidBytes::try_from(cid)?; in_ro_txn(&self.conn, |txn| has_block(txn, cid)) } /// Look up multiple blocks in one read transaction pub fn has_blocks<I, O>(&self, cids: I) -> Result<O> where I: IntoIterator<Item = Cid>, O: FromIterator<(Cid, bool)>, { in_ro_txn(&self.conn, |txn| { cids.into_iter() .map(|cid| -> Result<(Cid, bool)> { Ok((cid, has_block(txn, CidBytes::try_from(&cid)?)?)) }) .collect::<crate::Result<O>>() }) } /// Get the stats for the store. /// /// The stats are kept up to date, so this is fast. pub fn get_store_stats(&self) -> Result<StoreStats> { in_ro_txn(&self.conn, get_store_stats) } /// Get all cids that the store knows about pub fn get_known_cids<C: FromIterator<Cid>>(&mut self) -> Result<C> { let res = in_ro_txn(&self.conn, |txn| Ok(get_known_cids::<CidBytes>(txn)?))?; let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?; Ok(res) } /// Get all cids for which the store has blocks pub fn get_block_cids<C: FromIterator<Cid>>(&mut self) -> Result<C> { let res = in_ro_txn(&self.conn, |txn| Ok(get_block_cids::<CidBytes>(txn)?))?; let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?; Ok(res) } /// Get descendants of a cid pub fn get_descendants<C: FromIterator<Cid>>(&mut self, cid: &Cid) -> Result<C> { let cid = CidBytes::try_from(cid)?; let res = in_ro_txn(&self.conn, move |txn| get_descendants(txn, cid))?; let res = res.iter().map(Cid::try_from).collect::<cid::Result<C>>()?; Ok(res) } /// Given a root of a dag, gives all cids which we do not have data for. pub fn get_missing_blocks<C: FromIterator<Cid>>(&mut self, cid: &Cid) -> Result<C> { let cid = CidBytes::try_from(cid)?; let result = log_execution_time("get_missing_blocks", Duration::from_millis(10), || { in_ro_txn(&self.conn, move |txn| get_missing_blocks(txn, cid)) })?; let res = result .iter() .map(Cid::try_from) .collect::<cid::Result<C>>()?; Ok(res) } /// do a full garbage collection /// /// for a large block store, this can take several seconds to minutes. If that is not acceptable, /// consider using incremental gc. pub fn gc(&mut self) -> Result<()> { loop { let complete = self.incremental_gc(20000, Duration::from_secs(1))?; while !self.incremental_delete_orphaned(20000, Duration::from_secs(1))? {} if complete { break; } } Ok(()) } /// Perform an incremental garbage collection. /// /// Will collect unpinned blocks until either the size targets are met again, or at minimum /// `min_blocks` blocks are collected. Then it will continue connecting blocks until `max_duration` /// is elapsed. /// /// Note that this might significantly exceed `max_duration` for various reasons. Also note that /// when doing incremental gc, the actual blocks are not yet deleted. So a call to this method /// should usually be followed by a call to incremental_delete_orphaned. /// /// - `min_blocks` the minium number of blocks to collect in any case /// - `max_duration` the maximum duration that should be spent on gc /// /// Returns true if either size targets are met or there are no unpinned blocks left. pub fn incremental_gc(&mut self, min_blocks: usize, max_duration: Duration) -> Result<bool> { // atomically grab the expired_temp_pins until now let expired_temp_pins = { let mut result = Vec::new(); std::mem::swap( self.expired_temp_pins.lock().unwrap().deref_mut(), &mut result, ); result }; Ok(log_execution_time("gc", Duration::from_secs(1), || { let size_targets = self.config.size_targets; let cache_tracker = &mut self.config.cache_tracker; in_txn(&mut self.conn, move |txn| { // get rid of dropped temp aliases, this should be fast for id in expired_temp_pins { delete_temp_pin(txn, id)?; } Ok(incremental_gc( &txn, min_blocks, max_duration, size_targets, cache_tracker, )?) }) })?) } /// Incrementally delete orphaned blocks /// /// Orphaned blocks are blocks for which we have deleted the metadata in `incremental_gc`. /// /// Will delete orphaned blocks until either all orphaned blocks are deleted, or at minimum /// `min_blocks` blocks are deleted. Then it will continue deleting blocks until `max_duration` /// is elapsed. /// /// Note that this might significantly exceed `max_duration` for various reasons. /// /// - `min_blocks` the minium number of blocks to delete in any case /// - `max_duration` the maximum duration that should be spent on gc /// /// Returns true if all orphaned blocks are deleted pub fn incremental_delete_orphaned( &mut self, min_blocks: usize, max_duration: Duration, ) -> Result<bool> { Ok(log_execution_time( "delete_orphaned", Duration::from_millis(100), || { in_txn(&mut self.conn, move |txn| { Ok(incremental_delete_orphaned(txn, min_blocks, max_duration)?) }) }, )?) } /// Add a number of blocks to the store /// /// It is up to the caller to extract links from blocks. Also, the store does not know /// anything about content-addressing and will not validate that the cid of a block is the /// actual hash of the content. /// /// - `blocks` the blocks to add. /// Even we already have these blocks, the alias will be set. However, it will not be checked /// that the links or data are the same as last time the block was added. That is responsibility /// of the caller. /// - `alias` an optional temporary alias. /// This can be used to incrementally add blocks without having to worry about them being garbage /// collected before they can be pinned with a permanent alias. pub fn put_blocks<B: Block>( &mut self, blocks: impl IntoIterator<Item = B>, alias: Option<&TempPin>, ) -> Result<()> { let infos = in_txn(&mut self.conn, |txn| { let alias = alias.map(|alias| &alias.id); Ok(blocks .into_iter() .map(|block| { let cid_bytes = CidBytes::try_from(block.cid())?; let links = block .links()? .iter() .map(CidBytes::try_from) .collect::<std::result::Result<Vec<_>, cid::Error>>()?; let id = put_block(txn, &cid_bytes, &block.data(), links, alias)?; Ok(BlockInfo::new(id, block.cid(), block.data())) }) .collect::<Result<Vec<_>>>()?) })?; self.config.cache_tracker.blocks_written(infos); Ok(()) } /// Add a single block /// /// this is just a convenience method that calls put_blocks internally. /// /// - `cid` the cid /// This should be a hash of the data, with some format specifier. /// - `data` a blob /// - `links` links extracted from the data /// - `alias` an optional temporary alias pub fn put_block<I>( &mut self, cid: &Cid, data: &[u8], links: I, alias: Option<&TempPin>, ) -> Result<()> where I: IntoIterator<Item = Cid> + Clone, { let block = BorrowedBlock::new(*cid, data, move || Ok(links.clone().into_iter().collect())); self.put_blocks(Some(block), alias)?; Ok(()) } /// Get multiple blocks in a single read transaction pub fn get_blocks<I>(&mut self, cids: I) -> Result<impl Iterator<Item = (Cid, Option<Vec<u8>>)>> where I: IntoIterator<Item = Cid>, { let res = in_ro_txn(&self.conn, |txn| { cids.into_iter() .map(|cid| Ok((cid, get_block(txn, &CidBytes::try_from(&cid)?)?))) .collect::<crate::Result<Vec<_>>>() })?; let infos = res .iter() .filter_map(|(cid, res)| { res.as_ref() .map(|(id, data)| BlockInfo::new(*id, cid, data)) }) .collect::<Vec<_>>(); self.config.cache_tracker.blocks_accessed(infos); Ok(res .into_iter() .map(|(cid, res)| (cid, res.map(|(_, data)| data)))) } /// Get data for a block /// /// Will return None if we don't have the data pub fn get_block(&mut self, cid: &Cid) -> Result<Option<Vec<u8>>> { Ok(self.get_blocks(std::iter::once(*cid))?.next().unwrap().1) } }