use crate::filter::BlockSpendFilter;
use crate::{OracleSetup, SignedAttestation};
use async_trait::async_trait;
use bitcoin::hash_types::FilterHeader;
use bitcoin::hashes::Hash;
use bitcoin::secp256k1::{All, Secp256k1};
use bitcoin::{Block, BlockHash};
use bytes::Bytes;
use core::time::Duration;
use log::*;
use reqwest::{Client, StatusCode};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fs;
use std::fs::File;
use std::io::BufReader;
use std::path::PathBuf;
use url::Url;
const CLIENT_KEEP_ALIVE_SECONDS: u64 = 5;
#[derive(Debug, PartialEq)]
pub enum Error {
NotExists,
Transport,
PredecessorNotExists,
FilterHeaderMismatch,
InvalidSignature,
Invalid,
}
pub async fn source_from_url(url: Url) -> Result<Box<dyn Source>, Error> {
let source: Box<dyn Source> = match url.scheme() {
"file" => {
let path: PathBuf = url.path().into();
if !path.exists() {
panic!("{} does not exist", path.to_string_lossy());
}
Box::new(FileSource::new(path))
}
"http" | "https" => Box::new(HttpSource::new(url).await?),
_ => panic!("unknown scheme {}", url.scheme()),
};
Ok(source)
}
#[async_trait]
pub trait Source: Send + Sync {
async fn get(
&self,
height: u32,
block: &Block,
) -> Result<(SignedAttestation, FilterHeader), Error> {
let setup = self.oracle_setup().await;
let attestation = self.get_unchecked(height, &block.block_hash()).await?;
let mut prev_filter_header = FilterHeader::all_zeros();
if height > 1 {
let prev_attestation = self
.get_unchecked(height - 1, &block.header.prev_blockhash)
.await
.map_err(|e| match e {
Error::NotExists => Error::PredecessorNotExists,
e => e,
})?;
let filter = BlockSpendFilter::from_block(block);
let filter_hash = filter.filter_hash();
if attestation.attestation.filter_header
!= filter_hash.filter_header(&prev_attestation.attestation.filter_header)
{
return Err(Error::FilterHeaderMismatch);
}
prev_filter_header = prev_attestation.attestation.filter_header;
}
if !attestation.verify(&setup.public_key, self.secp()) {
return Err(Error::InvalidSignature);
}
if attestation.attestation.block_hash != block.block_hash() {
return Err(Error::Invalid);
}
Ok((attestation, prev_filter_header))
}
async fn get_unchecked(
&self,
height: u32,
block_hash: &BlockHash,
) -> Result<SignedAttestation, Error>;
async fn oracle_setup(&self) -> &OracleSetup;
fn secp(&self) -> &Secp256k1<All>;
async fn on_new_block(&self, block_height: u32, block: &Block);
}
pub fn attestation_path(height: u32, hash: &BlockHash) -> String {
format!("public/{:07}-{}.sa", height, hash)
}
pub fn attestation_filename(height: u32, hash: &BlockHash) -> String {
format!("{:07}-{}.sa", height, hash)
}
pub struct FileSource {
setup: OracleSetup,
datadir: PathBuf,
secp: Secp256k1<All>,
}
impl FileSource {
pub fn new(datadir: PathBuf) -> Self {
info!("Loading oracle setup from {}", datadir.display());
let setup = read_yaml_from_file(&datadir, "public/config").unwrap_or_else(|| {
panic!("{} does not exist", datadir.join("public/config").display())
});
Self {
setup,
datadir,
secp: Secp256k1::new(),
}
}
pub fn datadir(&self) -> &PathBuf {
&self.datadir
}
}
#[async_trait]
impl Source for FileSource {
async fn get_unchecked(
&self,
height: u32,
block_hash: &BlockHash,
) -> Result<SignedAttestation, Error> {
let filename = attestation_path(height, block_hash);
let attestation = read_yaml_from_file(&self.datadir, &filename).ok_or(Error::NotExists)?;
Ok(attestation)
}
async fn oracle_setup(&self) -> &OracleSetup {
&self.setup
}
fn secp(&self) -> &Secp256k1<All> {
&self.secp
}
async fn on_new_block(&self, _block_height: u32, _block: &Block) {}
}
pub fn read_yaml_from_file<T: DeserializeOwned>(datadir: &PathBuf, name: &str) -> Option<T> {
let path = datadir.join(name);
if let Ok(file) = File::open(&path) {
let reader = BufReader::new(file);
let data = serde_yaml::from_reader(reader).expect(&format!("cannot parse {}", name));
Some(data)
} else {
None
}
}
pub fn write_yaml_to_file<T: Serialize>(datadir: &PathBuf, name: &str, data: T) {
let path = datadir.join(name);
let temp_path = path.with_extension("tmp");
let file = File::create(&temp_path).expect(&format!("cannot create {}", name));
serde_yaml::to_writer(&file, &data).expect(&format!("cannot write {}", name));
drop(file);
fs::rename(temp_path, path).expect(&format!("cannot rename {}", name));
}
pub struct HttpSource {
setup: OracleSetup,
secp: Secp256k1<All>,
client: Client,
url: Url,
}
impl HttpSource {
pub async fn new(mut url: Url) -> Result<Self, Error> {
if !url.path().ends_with('/') {
url.set_path(&format!("{}/", url.path()));
}
let client = Client::builder()
.tcp_keepalive(Some(Duration::from_secs(CLIENT_KEEP_ALIVE_SECONDS)))
.build()
.expect("client creation");
info!("Loading oracle setup from {}", url);
let body = Self::get(&url, "config", &client).await?;
let setup = serde_yaml::from_reader(body.as_ref()).map_err(|e| {
warn!("cannot parse config: {}", e);
Error::Transport
})?;
Ok(Self {
setup,
secp: Secp256k1::new(),
client,
url,
})
}
async fn get(url: &Url, filename: &str, client: &Client) -> Result<Bytes, Error> {
let res = client
.get(url.join(filename).expect("url join filename"))
.send()
.await
.map_err(|e| {
warn!("cannot get {}: {}", url, e);
Error::Transport
})?;
if res.status() != StatusCode::OK {
warn!("bad status {}: {}", res.status(), url);
return Err(Error::Transport);
}
let s = res.bytes().await.map_err(|e| {
warn!("cannot read {}: {}", url, e);
Error::Transport
})?;
Ok(s)
}
}
#[async_trait]
impl Source for HttpSource {
async fn get_unchecked(
&self,
height: u32,
block_hash: &BlockHash,
) -> Result<SignedAttestation, Error> {
let filename = attestation_filename(height, block_hash);
let body = Self::get(&self.url, &filename, &self.client).await?;
let attestation = serde_yaml::from_reader(body.as_ref()).map_err(|e| {
warn!("cannot parse {}: {}", filename, e);
Error::Transport
})?;
Ok(attestation)
}
async fn oracle_setup(&self) -> &OracleSetup {
&self.setup
}
fn secp(&self) -> &Secp256k1<All> {
&self.secp
}
async fn on_new_block(&self, _block_height: u32, _block: &Block) {
}
}