use cid::Cid;
use ferripfs_blockstore::Blockstore;
use std::collections::HashSet;
use crate::{PinError, PinInfo, PinMode, PinResult, PinStore};
pub trait Pinner {
fn is_pinned(&self, cid: &Cid) -> PinResult<bool>;
fn is_pinned_with_mode(&self, cid: &Cid, mode: PinMode) -> PinResult<bool>;
fn pin(&mut self, cid: &Cid, mode: PinMode) -> PinResult<()>;
fn pin_with_name(&mut self, cid: &Cid, mode: PinMode, name: Option<String>) -> PinResult<()>;
fn unpin(&mut self, cid: &Cid, recursive: bool) -> PinResult<()>;
fn get_pin(&self, cid: &Cid) -> PinResult<Option<PinInfo>>;
fn list_pins(&self, mode: Option<PinMode>) -> PinResult<Vec<PinInfo>>;
fn update_pin(&mut self, old: &Cid, new: &Cid, unpin: bool) -> PinResult<()>;
fn verify(&self) -> PinResult<Vec<(Cid, String)>>;
fn pinned_cids(&self) -> PinResult<HashSet<Cid>>;
}
pub struct BlockstorePinner<'a, B: Blockstore> {
blockstore: &'a B,
store: PinStore,
}
impl<'a, B: Blockstore> BlockstorePinner<'a, B> {
pub fn new(blockstore: &'a B, store: PinStore) -> Self {
Self { blockstore, store }
}
pub fn store(&self) -> &PinStore {
&self.store
}
pub fn store_mut(&mut self) -> &mut PinStore {
&mut self.store
}
fn collect_refs(&self, cid: &Cid, refs: &mut HashSet<Cid>) -> PinResult<()> {
if refs.contains(cid) {
return Ok(());
}
let block = self
.blockstore
.get(cid)?
.ok_or_else(|| PinError::BlockNotFound(cid.to_string()))?;
if let Ok(links) = extract_links(block.data()) {
for link_cid in links {
refs.insert(link_cid);
self.collect_refs(&link_cid, refs)?;
}
}
Ok(())
}
}
impl<'a, B: Blockstore> Pinner for BlockstorePinner<'a, B> {
fn is_pinned(&self, cid: &Cid) -> PinResult<bool> {
Ok(self.store.is_pinned(cid))
}
fn is_pinned_with_mode(&self, cid: &Cid, mode: PinMode) -> PinResult<bool> {
Ok(self.store.is_pinned_with_mode(cid, mode))
}
fn pin(&mut self, cid: &Cid, mode: PinMode) -> PinResult<()> {
self.pin_with_name(cid, mode, None)
}
fn pin_with_name(&mut self, cid: &Cid, mode: PinMode, name: Option<String>) -> PinResult<()> {
if !self.blockstore.has(cid)? {
return Err(PinError::BlockNotFound(cid.to_string()));
}
match mode {
PinMode::Direct => {
self.store.add_direct(cid, name);
}
PinMode::Recursive => {
let mut refs = HashSet::new();
self.collect_refs(cid, &mut refs)?;
for ref_cid in &refs {
self.store.add_indirect(ref_cid, cid);
}
self.store.add_recursive(cid, name);
}
PinMode::Indirect => {
return Err(PinError::AlreadyPinned(
"Cannot create indirect pin directly".to_string(),
));
}
}
Ok(())
}
fn unpin(&mut self, cid: &Cid, recursive: bool) -> PinResult<()> {
let pin_info = self.store.get(cid);
match pin_info {
Some(info) => match info.mode {
PinMode::Direct => {
self.store.remove_direct(cid);
Ok(())
}
PinMode::Recursive => {
if !recursive {
return Err(PinError::NotPinned(format!(
"{} is pinned recursively, use recursive unpin",
cid
)));
}
let mut refs = HashSet::new();
let _ = self.collect_refs(cid, &mut refs);
for ref_cid in &refs {
self.store.remove_indirect(ref_cid, cid);
}
self.store.remove_recursive(cid);
Ok(())
}
PinMode::Indirect => Err(PinError::CannotUnpinIndirect(cid.to_string())),
},
None => Err(PinError::NotPinned(cid.to_string())),
}
}
fn get_pin(&self, cid: &Cid) -> PinResult<Option<PinInfo>> {
Ok(self.store.get(cid))
}
fn list_pins(&self, mode: Option<PinMode>) -> PinResult<Vec<PinInfo>> {
Ok(self.store.list(mode))
}
fn update_pin(&mut self, old: &Cid, new: &Cid, unpin: bool) -> PinResult<()> {
let old_info = self
.store
.get(old)
.ok_or_else(|| PinError::NotPinned(old.to_string()))?;
if !self.blockstore.has(new)? {
return Err(PinError::BlockNotFound(new.to_string()));
}
self.pin_with_name(new, old_info.mode, old_info.name)?;
if unpin {
let recursive = old_info.mode == PinMode::Recursive;
self.unpin(old, recursive)?;
}
Ok(())
}
fn verify(&self) -> PinResult<Vec<(Cid, String)>> {
let mut errors = Vec::new();
for pin in self.store.list(None) {
if pin.mode == PinMode::Indirect {
continue; }
let cid =
Cid::try_from(pin.cid.as_str()).map_err(|e| PinError::CidParse(e.to_string()))?;
match self.blockstore.has(&cid) {
Ok(true) => {}
Ok(false) => {
errors.push((cid, "block not found".to_string()));
}
Err(e) => {
errors.push((cid, format!("error checking block: {}", e)));
}
}
if pin.mode == PinMode::Recursive {
let mut refs = HashSet::new();
if let Err(e) = self.collect_refs(&cid, &mut refs) {
errors.push((cid, format!("error collecting refs: {}", e)));
} else {
for ref_cid in refs {
match self.blockstore.has(&ref_cid) {
Ok(true) => {}
Ok(false) => {
errors.push((ref_cid, "referenced block not found".to_string()));
}
Err(e) => {
errors.push((ref_cid, format!("error checking block: {}", e)));
}
}
}
}
}
}
Ok(errors)
}
fn pinned_cids(&self) -> PinResult<HashSet<Cid>> {
let mut cids = HashSet::new();
for pin in self.store.list(None) {
let cid =
Cid::try_from(pin.cid.as_str()).map_err(|e| PinError::CidParse(e.to_string()))?;
cids.insert(cid);
}
Ok(cids)
}
}
fn extract_links(data: &[u8]) -> Result<Vec<Cid>, ()> {
use prost::Message;
#[derive(Clone, PartialEq, Message)]
struct PbLink {
#[prost(bytes, optional, tag = "1")]
hash: Option<Vec<u8>>,
#[prost(string, optional, tag = "2")]
name: Option<String>,
#[prost(uint64, optional, tag = "3")]
tsize: Option<u64>,
}
#[derive(Clone, PartialEq, Message)]
struct PbNode {
#[prost(message, repeated, tag = "2")]
links: Vec<PbLink>,
#[prost(bytes, optional, tag = "1")]
data: Option<Vec<u8>>,
}
let node = PbNode::decode(data).map_err(|_| ())?;
let mut cids = Vec::new();
for link in node.links {
if let Some(hash) = link.hash {
if let Ok(cid) = Cid::try_from(hash) {
cids.push(cid);
}
}
}
Ok(cids)
}
#[cfg(test)]
mod tests {
use super::*;
use ferripfs_blockstore::{create_cid_v0, Block, FlatFsBlockstore};
use tempfile::tempdir;
fn create_test_block(data: &[u8]) -> Block {
let cid = create_cid_v0(data).unwrap();
Block::new(cid, data.to_vec())
}
#[test]
fn test_direct_pin() {
let dir = tempdir().unwrap();
let mut bs = FlatFsBlockstore::new_default(dir.path().join("blocks")).unwrap();
let block = create_test_block(b"test data");
let cid = *block.cid();
bs.put(block).unwrap();
let store = PinStore::new();
let mut pinner = BlockstorePinner::new(&bs, store);
pinner.pin(&cid, PinMode::Direct).unwrap();
assert!(pinner.is_pinned(&cid).unwrap());
assert!(pinner.is_pinned_with_mode(&cid, PinMode::Direct).unwrap());
assert!(!pinner
.is_pinned_with_mode(&cid, PinMode::Recursive)
.unwrap());
pinner.unpin(&cid, false).unwrap();
assert!(!pinner.is_pinned(&cid).unwrap());
}
#[test]
fn test_pin_with_name() {
let dir = tempdir().unwrap();
let mut bs = FlatFsBlockstore::new_default(dir.path().join("blocks")).unwrap();
let block = create_test_block(b"test data");
let cid = *block.cid();
bs.put(block).unwrap();
let store = PinStore::new();
let mut pinner = BlockstorePinner::new(&bs, store);
pinner
.pin_with_name(&cid, PinMode::Direct, Some("my-pin".to_string()))
.unwrap();
let info = pinner.get_pin(&cid).unwrap().unwrap();
assert_eq!(info.name, Some("my-pin".to_string()));
}
#[test]
fn test_list_pins() {
let dir = tempdir().unwrap();
let mut bs = FlatFsBlockstore::new_default(dir.path().join("blocks")).unwrap();
let block1 = create_test_block(b"data 1");
let block2 = create_test_block(b"data 2");
let cid1 = *block1.cid();
let cid2 = *block2.cid();
bs.put(block1).unwrap();
bs.put(block2).unwrap();
let store = PinStore::new();
let mut pinner = BlockstorePinner::new(&bs, store);
pinner.pin(&cid1, PinMode::Direct).unwrap();
pinner.pin(&cid2, PinMode::Direct).unwrap();
let all_pins = pinner.list_pins(None).unwrap();
assert_eq!(all_pins.len(), 2);
let direct_pins = pinner.list_pins(Some(PinMode::Direct)).unwrap();
assert_eq!(direct_pins.len(), 2);
}
}