use crate::feed_builder::FeedBuilder;
use crate::replicate::{Message, Peer};
pub use crate::storage::{Node, NodeTrait, Storage, Store};
use crate::audit::Audit;
use crate::bitfield::Bitfield;
use crate::crypto::{
generate_keypair, sign, verify, Hash, Merkle, PublicKey, SecretKey, Signature,
};
use crate::proof::Proof;
use anyhow::{bail, ensure, Result};
use flat_tree as flat;
use pretty_hash::fmt as pretty_fmt;
use random_access_disk::RandomAccessDisk;
use random_access_memory::RandomAccessMemory;
use random_access_storage::RandomAccess;
use tree_index::TreeIndex;
use std::borrow::Borrow;
use std::cmp;
use std::fmt::{self, Debug, Display};
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
#[derive(Debug)]
pub struct Feed<T>
where
T: RandomAccess<Error = Box<dyn std::error::Error + Send + Sync>> + Debug,
{
pub(crate) merkle: Merkle,
pub(crate) public_key: PublicKey,
pub(crate) secret_key: Option<SecretKey>,
pub(crate) storage: Storage<T>,
pub(crate) byte_length: u64,
pub(crate) length: u64,
pub(crate) bitfield: Bitfield,
pub(crate) tree: TreeIndex,
pub(crate) peers: Vec<Peer>,
}
impl<T> Feed<T>
where
T: RandomAccess<Error = Box<dyn std::error::Error + Send + Sync>> + Debug + Send,
{
pub async fn with_storage(mut storage: crate::storage::Storage<T>) -> Result<Self> {
match storage.read_partial_keypair().await {
Some(partial_keypair) => {
let builder = FeedBuilder::new(partial_keypair.public, storage);
if partial_keypair.secret.is_none() {
return Ok(builder.build().await?);
}
builder
.secret_key(
partial_keypair
.secret
.ok_or_else(|| anyhow::anyhow!("secret-key not present"))?,
)
.build()
.await
}
None => {
let keypair = generate_keypair();
storage.write_public_key(&keypair.public).await?;
storage.write_secret_key(&keypair.secret).await?;
FeedBuilder::new(keypair.public, storage)
.secret_key(keypair.secret)
.build()
.await
}
}
}
pub fn builder(public_key: PublicKey, storage: Storage<T>) -> FeedBuilder<T> {
FeedBuilder::new(public_key, storage)
}
#[inline]
pub fn len(&self) -> u64 {
self.length
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn byte_len(&self) -> u64 {
self.byte_length
}
#[inline]
pub async fn append(&mut self, data: &[u8]) -> Result<()> {
let key = match &self.secret_key {
Some(key) => key,
None => bail!("no secret key, cannot append."),
};
self.merkle.next(data);
self.storage
.write_data(self.byte_length as u64, &data)
.await?;
let hash = Hash::from_roots(self.merkle.roots());
let index = self.length;
let message = hash_with_length_as_bytes(hash, index + 1);
let signature = sign(&self.public_key, key, &message);
self.storage.put_signature(index, signature).await?;
for node in self.merkle.nodes() {
self.storage.put_node(node).await?;
}
self.byte_length += data.len() as u64;
self.bitfield.set(index, true);
self.tree.set(tree_index(index));
self.length += 1;
let bytes = self.bitfield.to_bytes(&self.tree)?;
self.storage.put_bitfield(0, &bytes).await?;
Ok(())
}
#[inline]
pub async fn head(&mut self) -> Result<Option<Vec<u8>>> {
match self.len() {
0 => Ok(None),
len => self.get(len - 1).await,
}
}
#[inline]
pub fn has(&mut self, index: u64) -> bool {
self.bitfield.get(index)
}
#[inline]
pub fn has_all(&mut self, range: ::std::ops::Range<u64>) -> bool {
let total = range.clone().count();
total == self.bitfield.total_with_range(range) as usize
}
#[inline]
pub fn downloaded(&mut self, range: ::std::ops::Range<u64>) -> u8 {
self.bitfield.total_with_range(range)
}
#[inline]
pub async fn get(&mut self, index: u64) -> Result<Option<Vec<u8>>> {
if !self.bitfield.get(index) {
return Ok(None);
}
Ok(Some(self.storage.get_data(index).await?))
}
#[inline]
pub async fn proof(&mut self, index: u64, include_hash: bool) -> Result<Proof> {
self.proof_with_digest(index, 0, include_hash).await
}
pub async fn proof_with_digest(
&mut self,
index: u64,
digest: u64,
include_hash: bool,
) -> Result<Proof> {
let mut remote_tree = TreeIndex::default();
let mut nodes = vec![];
let proof = self.tree.proof_with_digest(
tree_index(index),
digest,
include_hash,
&mut nodes,
&mut remote_tree,
);
let proof = match proof {
Some(proof) => proof,
None => bail!("No proof available for index {}", index),
};
let tmp_num = proof.verified_by() / 2;
let (sig_index, has_underflow) = tmp_num.overflowing_sub(1);
let signature = if has_underflow {
None
} else {
match self.storage.get_signature(sig_index).await {
Ok(sig) => Some(sig),
Err(_) => None,
}
};
let mut nodes = Vec::with_capacity(proof.nodes().len());
for index in proof.nodes() {
let node = self.storage.get_node(*index).await?;
nodes.push(node);
}
Ok(Proof {
nodes,
signature,
index,
})
}
pub fn digest(&mut self, index: u64) -> u64 {
self.tree.digest(tree_index(index))
}
pub async fn put(&mut self, index: u64, data: Option<&[u8]>, mut proof: Proof) -> Result<()> {
let mut next = tree_index(index);
let mut trusted: Option<u64> = None;
let mut missing = vec![];
let mut i = match data {
Some(_) => 0,
None => 1,
};
loop {
if self.tree.get(next) {
trusted = Some(next);
break;
}
let sibling = flat::sibling(next);
next = flat::parent(next);
if i < proof.nodes.len() && proof.nodes[i].index == sibling {
i += 1;
continue;
}
if !self.tree.get(sibling) {
break;
}
missing.push(sibling);
}
if trusted.is_none() && self.tree.get(next) {
trusted = Some(next);
}
let mut missing_nodes = vec![];
for index in missing {
let node = self.storage.get_node(index).await?;
missing_nodes.push(node);
}
let mut trusted_node = None;
if let Some(index) = trusted {
let node = self.storage.get_node(index).await?;
trusted_node = Some(node);
}
let mut visited = vec![];
let mut top = match data {
Some(data) => Node::new(
tree_index(index),
Hash::from_leaf(&data).as_bytes().to_owned(),
data.len() as u64,
),
None => proof.nodes.remove(0),
};
if verify_node(&trusted_node, &top) {
self.write(index, data, &visited, None).await?;
return Ok(());
}
loop {
let node;
let next = flat::sibling(top.index);
if !proof.nodes.is_empty() && proof.nodes[0].index == next {
node = proof.nodes.remove(0);
visited.push(node.clone());
} else if !missing_nodes.is_empty() && missing_nodes[0].index == next {
node = missing_nodes.remove(0);
} else {
let nodes = self.verify_roots(&top, &mut proof).await?;
visited.extend_from_slice(&nodes);
self.write(index, data, &visited, proof.signature).await?;
return Ok(());
}
visited.push(top.clone());
let hash = Hash::from_hashes(&top, &node);
let len = top.len() + node.len();
top = Node::new(flat::parent(top.index), hash.as_bytes().into(), len);
if verify_node(&trusted_node, &top) {
self.write(index, data, &visited, None).await?;
return Ok(());
}
}
fn verify_node(trusted: &Option<Node>, node: &Node) -> bool {
match trusted {
None => false,
Some(trusted) => trusted.index == node.index && trusted.hash == node.hash,
}
}
}
async fn write(
&mut self,
index: u64,
data: Option<&[u8]>,
nodes: &[Node],
sig: Option<Signature>,
) -> Result<()> {
for node in nodes {
self.storage.put_node(node).await?;
}
if let Some(data) = data {
self.storage.put_data(index, data, &nodes).await?;
}
if let Some(sig) = sig {
let sig = sig.borrow();
self.storage.put_signature(index, sig).await?;
}
for node in nodes {
self.tree.set(node.index);
}
self.tree.set(tree_index(index));
if let Some(_data) = data {
if self.bitfield.set(index, true).is_changed() {
}
}
Ok(())
}
pub async fn signature(&mut self, index: u64) -> Result<Signature> {
ensure!(
index < self.length,
format!("No signature found for index {}", index)
);
self.storage.next_signature(index).await
}
pub async fn verify(&mut self, index: u64, signature: &Signature) -> Result<()> {
let roots = self.root_hashes(index).await?;
let roots: Vec<_> = roots.into_iter().map(Arc::new).collect();
let hash = Hash::from_roots(&roots);
let message = hash_with_length_as_bytes(hash, index + 1);
verify_compat(&self.public_key, &message, Some(signature))?;
Ok(())
}
pub fn announce(&mut self, message: &Message, from: &Peer) {
for peer in &mut self.peers {
if peer != from {
peer.have(message)
}
}
}
pub fn unannounce(&mut self, message: &Message) {
for peer in &mut self.peers {
peer.unhave(message)
}
}
pub async fn root_hashes(&mut self, index: u64) -> Result<Vec<Node>> {
ensure!(
index <= self.length,
format!("Root index bounds exceeded {} > {}", index, self.length)
);
let roots_index = tree_index(index) + 2;
let mut indexes = vec![];
flat::full_roots(roots_index, &mut indexes);
let mut roots = Vec::with_capacity(indexes.len());
for index in indexes {
let node = self.storage.get_node(index).await?;
roots.push(node);
}
Ok(roots)
}
pub fn public_key(&self) -> &PublicKey {
&self.public_key
}
pub fn secret_key(&self) -> &Option<SecretKey> {
&self.secret_key
}
async fn verify_roots(&mut self, top: &Node, proof: &mut Proof) -> Result<Vec<Node>> {
let last_node = if !proof.nodes.is_empty() {
proof.nodes[proof.nodes.len() - 1].index
} else {
top.index
};
let verified_by = cmp::max(flat::right_span(top.index), flat::right_span(last_node)) + 2;
let mut indexes = vec![];
flat::full_roots(verified_by, &mut indexes);
let mut roots = Vec::with_capacity(indexes.len());
let mut extra_nodes = vec![];
for index in indexes {
if index == top.index {
extra_nodes.push(top.clone());
roots.push(top.clone());
} else if !proof.nodes.is_empty() && index == proof.nodes[0].index {
extra_nodes.push(proof.nodes[0].clone());
roots.push(proof.nodes.remove(0));
} else if self.tree.get(index) {
let node = self.storage.get_node(index).await?;
roots.push(node);
} else {
bail!("<hypercore>: Missing tree roots needed for verify");
}
}
let checksum = Hash::from_roots(&roots);
let length = verified_by / 2;
let message = hash_with_length_as_bytes(checksum, length);
verify_compat(&self.public_key, &message, proof.signature())?;
let len = verified_by / 2;
if len > self.len() {
self.length = len;
self.byte_length = roots.iter().fold(0, |acc, root| acc + root.index)
}
Ok(extra_nodes)
}
pub async fn audit(&mut self) -> Result<Audit> {
let mut valid_blocks = 0;
let mut invalid_blocks = 0;
for index in 0..self.length {
if self.bitfield.get(index) {
let node = self.storage.get_node(2 * index).await?;
let data = self.storage.get_data(index).await?;
let data_hash = Hash::from_leaf(&data);
if node.hash == data_hash.as_bytes() {
valid_blocks += 1;
} else {
invalid_blocks += 1;
self.bitfield.set(index, false);
}
}
}
Ok(Audit {
valid_blocks,
invalid_blocks,
})
}
pub fn bitfield(&self) -> &Bitfield {
&self.bitfield
}
pub fn download(&mut self, _range: Range<u64>) -> Result<()> {
unimplemented!();
}
pub fn undownload(&mut self, _range: Range<u64>) -> Result<()> {
unimplemented!();
}
pub fn finalize(&mut self) -> Result<()> {
unimplemented!();
}
pub fn update_peers(&mut self) {
for peer in &mut self.peers {
peer.update();
}
}
}
impl Feed<RandomAccessDisk> {
pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
if let Err(e) = std::fs::create_dir_all(&path) {
return Err(anyhow::Error::msg(format!(
"Failed to create directory {} because of: {}",
path.as_ref().display(),
e
)));
}
let dir = path.as_ref().to_owned();
let storage = Storage::new_disk(&dir, false).await?;
Self::with_storage(storage).await
}
}
impl Default for Feed<RandomAccessMemory> {
fn default() -> Self {
async_std::task::block_on(async {
let storage = Storage::new_memory().await.unwrap();
Self::with_storage(storage).await.unwrap()
})
}
}
impl<T: RandomAccess<Error = Box<dyn std::error::Error + Send + Sync>> + Debug + Send> Display
for Feed<T>
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let key = pretty_fmt(&self.public_key.to_bytes()).unwrap();
let byte_len = self.byte_len();
let len = self.len();
let peers = 0;
write!(
f,
"Hypercore(key=[{}], length={}, byte_length={}, peers={})",
key, len, byte_len, peers
)
}
}
#[inline]
fn tree_index(index: u64) -> u64 {
2 * index
}
fn hash_with_length_as_bytes(hash: Hash, length: u64) -> Vec<u8> {
[hash.as_bytes(), &length.to_be_bytes()].concat().to_vec()
}
pub fn verify_compat(public: &PublicKey, msg: &[u8], sig: Option<&Signature>) -> Result<()> {
match verify(public, msg, sig) {
Ok(_) => Ok(()),
Err(_) => verify(public, &msg[0..32], sig),
}
}