use crate::error::Error;
use crate::p2p::KadResult;
use crate::path::IpfsPath;
use crate::{Block, ReceiverChannel, StoragePath};
use anyhow::anyhow;
use async_trait::async_trait;
use core::fmt::Debug;
use futures::channel::{
mpsc::{channel, Receiver, Sender},
oneshot,
};
use futures::sink::SinkExt;
use futures::{StreamExt, TryStreamExt};
use libipld::cid::Cid;
use libipld::{Ipld, IpldCodec};
use libp2p::identity::PeerId;
use parking_lot::{Mutex, RwLock};
use std::borrow::Borrow;
use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{error, fmt, io};
use tracing::log;
#[macro_use]
#[cfg(test)]
mod common_tests;
pub mod blockstore;
pub mod datastore;
pub mod lock;
pub(crate) mod paths;
#[derive(Debug, PartialEq, Eq)]
pub enum BlockPut {
NewBlock,
Existed,
}
#[derive(Debug)]
pub enum BlockRm {
Removed(Cid),
}
#[derive(Debug)]
pub enum BlockRmError {
NotFound(Cid),
}
#[async_trait]
pub trait BlockStore: Debug + Send + Sync + 'static {
async fn init(&self) -> Result<(), Error>;
async fn open(&self) -> Result<(), Error>;
async fn contains(&self, cid: &Cid) -> Result<bool, Error>;
async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error>;
async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error>;
async fn remove(&self, cid: &Cid) -> Result<Result<BlockRm, BlockRmError>, Error>;
async fn list(&self) -> Result<Vec<Cid>, Error>;
async fn wipe(&self) {}
}
#[async_trait]
pub trait DataStore: PinStore + Debug + Send + Sync + 'static {
async fn init(&self) -> Result<(), Error>;
async fn open(&self) -> Result<(), Error>;
async fn contains(&self, key: &[u8]) -> Result<bool, Error>;
async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error>;
async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error>;
async fn remove(&self, key: &[u8]) -> Result<(), Error>;
async fn iter(&self) -> futures::stream::BoxStream<'static, (Vec<u8>, Vec<u8>)>;
async fn wipe(&self) {}
}
#[derive(Debug)]
pub enum LockError {
RepoInUse,
LockFileOpenFailed(io::Error),
}
impl fmt::Display for LockError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let msg = match self {
LockError::RepoInUse => "The repository is already being used by an IPFS instance.",
LockError::LockFileOpenFailed(_) => "Failed to open repository lock file.",
};
write!(f, "{msg}")
}
}
impl From<io::Error> for LockError {
fn from(error: io::Error) -> Self {
match error.kind() {
io::ErrorKind::WouldBlock => LockError::RepoInUse,
_ => LockError::LockFileOpenFailed(error),
}
}
}
impl error::Error for LockError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
if let Self::LockFileOpenFailed(error) = self {
Some(error)
} else {
None
}
}
}
pub trait Lock: Debug + Send + Sync + 'static {
fn try_exclusive(&self) -> Result<(), LockError>;
}
type References<'a> = futures::stream::BoxStream<'a, Result<Cid, crate::refs::IpldRefsError>>;
#[async_trait]
pub trait PinStore: Debug + Send + Sync + Unpin + 'static {
async fn is_pinned(&self, block: &Cid) -> Result<bool, Error>;
async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error>;
async fn insert_recursive_pin(
&self,
target: &Cid,
referenced: References<'_>,
) -> Result<(), Error>;
async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error>;
async fn remove_recursive_pin(
&self,
target: &Cid,
referenced: References<'_>,
) -> Result<(), Error>;
async fn list(
&self,
mode: Option<PinMode>,
) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>>;
async fn query(
&self,
ids: Vec<Cid>,
requirement: Option<PinMode>,
) -> Result<Vec<(Cid, PinKind<Cid>)>, Error>;
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PinMode {
Indirect,
Direct,
Recursive,
}
#[derive(Debug, Clone, Copy)]
enum PinModeRequirement {
Only(PinMode),
Any,
}
impl From<Option<PinMode>> for PinModeRequirement {
fn from(filter: Option<PinMode>) -> Self {
match filter {
Some(one) => PinModeRequirement::Only(one),
None => PinModeRequirement::Any,
}
}
}
impl PinModeRequirement {
fn is_indirect_or_any(&self) -> bool {
use PinModeRequirement::*;
match self {
Only(PinMode::Indirect) | Any => true,
Only(_) => false,
}
}
fn matches<P: PartialEq<PinMode>>(&self, other: &P) -> bool {
use PinModeRequirement::*;
match self {
Only(one) if other == one => true,
Only(_) => false,
Any => true,
}
}
fn required(&self) -> Option<PinMode> {
use PinModeRequirement::*;
match self {
Only(one) => Some(*one),
Any => None,
}
}
}
impl<B: Borrow<Cid>> PartialEq<PinMode> for PinKind<B> {
fn eq(&self, other: &PinMode) -> bool {
matches!(
(self, other),
(PinKind::IndirectFrom(_), PinMode::Indirect)
| (PinKind::Direct, PinMode::Direct)
| (PinKind::Recursive(_), PinMode::Recursive)
| (PinKind::RecursiveIntention, PinMode::Recursive)
)
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum PinKind<C: Borrow<Cid>> {
IndirectFrom(C),
Direct,
Recursive(u64),
RecursiveIntention,
}
impl<C: Borrow<Cid>> PinKind<C> {
fn as_ref(&self) -> PinKind<&'_ Cid> {
use PinKind::*;
match self {
IndirectFrom(c) => PinKind::IndirectFrom(c.borrow()),
Direct => PinKind::Direct,
Recursive(count) => PinKind::Recursive(*count),
RecursiveIntention => PinKind::RecursiveIntention,
}
}
}
#[allow(clippy::type_complexity)]
#[derive(Debug, Clone)]
pub struct Repo {
online: Arc<AtomicBool>,
initialized: Arc<AtomicBool>,
block_store: Arc<dyn BlockStore>,
data_store: Arc<dyn DataStore>,
events: Arc<RwLock<Option<Sender<RepoEvent>>>>,
pub(crate) subscriptions:
Arc<Mutex<HashMap<Cid, Vec<futures::channel::oneshot::Sender<Result<Block, String>>>>>>,
lockfile: Arc<dyn Lock>,
}
#[async_trait]
impl beetle_bitswap_next::Store for Repo {
async fn get_size(&self, cid: &Cid) -> anyhow::Result<usize> {
self.get_block_now(cid)
.await?
.ok_or(anyhow::anyhow!("Block doesnt exist"))
.map(|block| block.data().len())
}
async fn get(&self, cid: &Cid) -> anyhow::Result<beetle_bitswap_next::Block> {
let block = self
.get_block_now(cid)
.await?
.ok_or(anyhow::anyhow!("Block doesnt exist"))?;
Ok(beetle_bitswap_next::Block {
cid: *block.cid(),
data: bytes::Bytes::copy_from_slice(block.data()),
})
}
async fn has(&self, cid: &Cid) -> anyhow::Result<bool> {
self.contains(cid).await
}
}
#[derive(Debug)]
pub enum RepoEvent {
WantBlock(Option<u64>, Cid, Vec<PeerId>),
UnwantBlock(Cid),
NewBlock(
Block,
oneshot::Sender<Result<ReceiverChannel<KadResult>, anyhow::Error>>,
),
RemovedBlock(Cid),
}
impl Repo {
pub fn new(repo_type: StoragePath) -> Self {
match repo_type {
StoragePath::Memory => Repo::new_memory(),
StoragePath::Disk(path) => Repo::new_fs(path),
StoragePath::Custom {
blockstore,
datastore,
lock,
} => Repo::new_raw(blockstore, datastore, lock),
}
}
pub fn new_raw(
block_store: Arc<dyn BlockStore>,
data_store: Arc<dyn DataStore>,
lockfile: Arc<dyn Lock>,
) -> Self {
Repo {
initialized: Arc::default(),
online: Arc::default(),
block_store,
data_store,
events: Arc::default(),
subscriptions: Default::default(),
lockfile,
}
}
pub fn new_fs(path: impl AsRef<Path>) -> Self {
let path = path.as_ref().to_path_buf();
let mut blockstore_path = path.clone();
let mut datastore_path = path.clone();
let mut lockfile_path = path;
blockstore_path.push("blockstore");
datastore_path.push("datastore");
lockfile_path.push("repo_lock");
let block_store = Arc::new(blockstore::flatfs::FsBlockStore::new(blockstore_path));
#[cfg(not(feature = "sled_data_store"))]
let data_store = Arc::new(datastore::flatfs::FsDataStore::new(datastore_path));
#[cfg(feature = "sled_data_store")]
let data_store = Arc::new(datastore::sled::SledDataStore::new(datastore_path));
let lockfile = Arc::new(lock::FsLock::new(lockfile_path));
Self::new_raw(block_store, data_store, lockfile)
}
pub fn new_memory() -> Self {
let block_store = Arc::new(blockstore::memory::MemBlockStore::new(Default::default()));
let data_store = Arc::new(datastore::memory::MemDataStore::new(Default::default()));
let lockfile = Arc::new(lock::MemLock);
Self::new_raw(block_store, data_store, lockfile)
}
pub async fn migrate(&self, repo: &Self) -> Result<(), Error> {
if self.is_online() || repo.is_online() {
anyhow::bail!("Repository cannot be online");
}
let block_migration = {
let this = self.clone();
let external = repo.clone();
async move {
if let Ok(list) = this.list_blocks().await {
for cid in list {
match this.get_block_now(&cid).await {
Ok(Some(block)) => match external.block_store.put(block).await {
Ok(_) => {}
Err(e) => error!("Error migrating {cid}: {e}"),
},
Ok(None) => error!("{cid} doesnt exist"),
Err(e) => error!("Error getting block {cid}: {e}"),
}
}
}
}
};
let data_migration = {
let this = self.clone();
let external = repo.clone();
async move {
let mut data_stream = this.data_store().iter().await;
while let Some((k, v)) = data_stream.next().await {
if let Err(e) = external.data_store().put(&k, &v).await {
error!("Unable to migrate {k:?} into repo: {e}");
}
}
}
};
let pins_migration = {
let this = self.clone();
let external = repo.clone();
async move {
let mut stream = this.data_store().list(None).await;
while let Some(Ok((cid, pin_mode))) = stream.next().await {
match pin_mode {
PinMode::Direct => {
match external.data_store().insert_direct_pin(&cid).await {
Ok(_) => {}
Err(e) => error!("Unable to migrate pin {cid}: {e}"),
}
}
PinMode::Indirect => {
continue;
}
PinMode::Recursive => {
let block = match this.get_block_now(&cid).await.map(|block| {
block.and_then(|block| block.decode::<IpldCodec, Ipld>().ok())
}) {
Ok(Some(block)) => block,
Ok(None) => continue,
Err(e) => {
error!("Block {cid} does not exist but is pinned: {e}");
continue;
}
};
let st = crate::refs::IpldRefs::default()
.with_only_unique()
.refs_of_resolved(self, vec![(cid, block.clone())].into_iter())
.map_ok(|crate::refs::Edge { destination, .. }| destination)
.into_stream()
.boxed();
if let Err(e) = external.insert_recursive_pin(&cid, st).await {
error!("Error migrating pin {cid}: {e}");
continue;
}
}
}
}
}
};
futures::join!(block_migration, data_migration, pins_migration);
Ok(())
}
pub(crate) fn initialize_channel(&self) -> Receiver<RepoEvent> {
let mut event_guard = self.events.write();
let (sender, receiver) = channel(1);
debug_assert!(event_guard.is_none());
*event_guard = Some(sender);
self.set_online();
receiver
}
pub fn shutdown(&self) {
let mut map = self.subscriptions.lock();
map.clear();
drop(map);
if let Some(mut event) = self.events.write().take() {
event.close_channel()
}
self.set_offline();
}
pub fn is_online(&self) -> bool {
self.online.load(Ordering::SeqCst)
}
pub(crate) fn set_online(&self) {
if self.is_online() {
return;
}
self.online.store(true, Ordering::SeqCst)
}
pub(crate) fn set_offline(&self) {
if !self.is_online() {
return;
}
self.online.store(false, Ordering::SeqCst)
}
fn repo_channel(&self) -> Option<Sender<RepoEvent>> {
self.events.read().clone()
}
pub async fn init(&self) -> Result<(), Error> {
if self.initialized.load(Ordering::SeqCst) {
return Ok(());
}
{
log::debug!("Trying lockfile");
self.lockfile.try_exclusive()?;
log::debug!("lockfile tried");
}
let f1 = self.block_store.init();
let f2 = self.data_store.init();
let (r1, r2) = futures::future::join(f1, f2).await;
let init = self.initialized.clone();
if r1.is_err() {
r1.map(|_| {
init.store(true, Ordering::SeqCst);
})
} else {
r2.map(|_| {
init.store(true, Ordering::SeqCst);
})
}
}
pub async fn open(&self) -> Result<(), Error> {
let f1 = self.block_store.open();
let f2 = self.data_store.open();
let (r1, r2) = futures::future::join(f1, f2).await;
if r1.is_err() {
r1
} else {
r2
}
}
pub async fn put_block(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
let (cid, res) = self.block_store.put(block.clone()).await?;
if let BlockPut::NewBlock = res {
let list = self.subscriptions.lock().remove(&cid);
if let Some(mut list) = list {
for ch in list.drain(..) {
let block = block.clone();
tokio::spawn(async move {
let _ = ch.send(Ok(block));
});
}
}
}
Ok((cid, res))
}
#[inline]
pub async fn get_block(
&self,
cid: &Cid,
peers: &[PeerId],
local_only: bool,
) -> Result<Block, Error> {
self.get_block_with_session(None, cid, peers, local_only)
.await
}
pub(crate) async fn get_block_with_session(
&self,
session: Option<u64>,
cid: &Cid,
peers: &[PeerId],
local_only: bool,
) -> Result<Block, Error> {
if let Some(block) = self.get_block_now(cid).await? {
Ok(block)
} else {
if local_only || !self.is_online() {
anyhow::bail!("Unable to locate block {cid}");
}
let (tx, rx) = futures::channel::oneshot::channel();
self.subscriptions.lock().entry(*cid).or_default().push(tx);
let mut events = self
.repo_channel()
.ok_or(anyhow::anyhow!("Channel is not available"))?;
events
.send(RepoEvent::WantBlock(session, *cid, peers.to_vec()))
.await
.ok();
rx.await?.map_err(|e| anyhow!("{e}"))
}
}
pub async fn get_block_now(&self, cid: &Cid) -> Result<Option<Block>, Error> {
self.block_store.get(cid).await
}
pub async fn contains(&self, cid: &Cid) -> Result<bool, Error> {
self.block_store.contains(cid).await
}
pub async fn list_blocks(&self) -> Result<Vec<Cid>, Error> {
self.block_store.list().await
}
pub async fn remove_block(&self, cid: &Cid) -> Result<Cid, Error> {
if self.is_pinned(cid).await? {
return Err(anyhow::anyhow!("block to remove is pinned"));
}
match self.block_store.remove(cid).await? {
Ok(success) => match success {
BlockRm::Removed(_cid) => {
let mut events = self
.repo_channel()
.ok_or(anyhow::anyhow!("Channel is not available"))?;
events.send(RepoEvent::RemovedBlock(*cid)).await.ok();
Ok(*cid)
}
},
Err(err) => match err {
BlockRmError::NotFound(_cid) => Err(anyhow::anyhow!("block not found")),
},
}
}
pub async fn get_ipns(&self, ipns: &PeerId) -> Result<Option<IpfsPath>, Error> {
use std::str::FromStr;
let data_store = &self.data_store;
let key = ipns.to_owned();
let key = format!("ipns/{key}");
let bytes = data_store.get(key.as_bytes()).await?;
match bytes {
Some(ref bytes) => {
let string = String::from_utf8_lossy(bytes);
let path = IpfsPath::from_str(&string)?;
Ok(Some(path))
}
None => Ok(None),
}
}
pub async fn put_ipns(&self, ipns: &PeerId, path: &IpfsPath) -> Result<(), Error> {
let string = path.to_string();
let value = string.as_bytes();
let key = format!("ipns/{ipns}");
self.data_store.put(key.as_bytes(), value).await
}
pub async fn remove_ipns(&self, ipns: &PeerId) -> Result<(), Error> {
let key = format!("ipns/{ipns}");
self.data_store.remove(key.as_bytes()).await
}
pub async fn insert_pin(
&self,
cid: &Cid,
recursive: bool,
local_only: bool,
) -> Result<(), Error> {
let block = self.get_block(cid, &[], local_only).await?;
if !recursive {
self.insert_direct_pin(cid).await?
} else {
let ipld = block.decode::<IpldCodec, Ipld>()?;
let st = crate::refs::IpldRefs::default()
.with_only_unique()
.refs_of_resolved(self, vec![(*cid, ipld.clone())].into_iter())
.map_ok(|crate::refs::Edge { destination, .. }| destination)
.into_stream()
.boxed();
self.insert_recursive_pin(cid, st).await?
}
Ok(())
}
pub async fn insert_direct_pin(&self, cid: &Cid) -> Result<(), Error> {
self.data_store.insert_direct_pin(cid).await
}
pub async fn insert_recursive_pin(&self, cid: &Cid, refs: References<'_>) -> Result<(), Error> {
self.data_store.insert_recursive_pin(cid, refs).await
}
pub async fn remove_direct_pin(&self, cid: &Cid) -> Result<(), Error> {
self.data_store.remove_direct_pin(cid).await
}
pub async fn remove_recursive_pin(&self, cid: &Cid, refs: References<'_>) -> Result<(), Error> {
self.data_store.remove_recursive_pin(cid, refs).await
}
pub async fn cleanup(&self) -> Result<Vec<Cid>, Error> {
let mut removed_blocks = vec![];
let blocks = self.list_blocks().await?;
for cid in blocks {
if !self.is_pinned(&cid).await? {
if let Ok(cid) = self.remove_block(&cid).await {
removed_blocks.push(cid);
}
}
}
Ok(removed_blocks)
}
pub async fn is_pinned(&self, cid: &Cid) -> Result<bool, Error> {
self.data_store.is_pinned(cid).await
}
pub async fn list_pins(
&self,
mode: Option<PinMode>,
) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>> {
self.data_store.list(mode).await
}
pub async fn query_pins(
&self,
cids: Vec<Cid>,
requirement: Option<PinMode>,
) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
self.data_store.query(cids, requirement).await
}
}
impl Repo {
pub fn data_store(&self) -> &dyn DataStore {
&*self.data_store
}
}