txoo 0.10.0

A Bitcoin transaction-output oracle
Documentation
use crate::filter::BlockSpendFilter;
use crate::{OracleSetup, SignedAttestation};
use async_trait::async_trait;
use bitcoin::hash_types::FilterHeader;
use bitcoin::hashes::Hash;
use bitcoin::secp256k1::{All, Secp256k1};
use bitcoin::{Block, BlockHash};
use bytes::Bytes;
use core::time::Duration;
use log::*;
use reqwest::{Client, StatusCode};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fs;
use std::fs::File;
use std::io::BufReader;
use std::path::PathBuf;
use url::Url;

const CLIENT_KEEP_ALIVE_SECONDS: u64 = 5;

/// An error during attestation retrieval
#[derive(Debug, PartialEq)]
pub enum Error {
    /// The attestation does not exist
    NotExists,
    /// Unable to retrieve the attestation
    Transport,
    /// The previous attestation does not exist
    PredecessorNotExists,
    /// The filter header does not match the block and/or the previous filter header
    FilterHeaderMismatch,
    /// The attestation signature is invalid
    InvalidSignature,
    /// The attestation is otherwise invalid
    Invalid,
}

/// Build a source from a URL (either file:// or http(s)://)
pub async fn source_from_url(url: Url) -> Result<Box<dyn Source>, Error> {
    let source: Box<dyn Source> = match url.scheme() {
        "file" => {
            let path: PathBuf = url.path().into();
            if !path.exists() {
                panic!("{} does not exist", path.to_string_lossy());
            }
            Box::new(FileSource::new(path))
        }
        "http" | "https" => Box::new(HttpSource::new(url).await?),
        _ => panic!("unknown scheme {}", url.scheme()),
    };
    Ok(source)
}

/// An attestation source
#[async_trait]
pub trait Source: Send + Sync {
    /// Get an attestation for the given block, with full validation
    /// Also returns the previous filter header
    async fn get(
        &self,
        height: u32,
        block: &Block,
    ) -> Result<(SignedAttestation, FilterHeader), Error> {
        let setup = self.oracle_setup().await;
        let attestation = self.get_unchecked(height, &block.block_hash()).await?;
        // we assume filter header for genesis block as all zeros
        let mut prev_filter_header = FilterHeader::all_zeros();
        if height > 1 {
            let prev_attestation = self
                .get_unchecked(height - 1, &block.header.prev_blockhash)
                .await
                .map_err(|e| match e {
                    Error::NotExists => Error::PredecessorNotExists,
                    e => e,
                })?;
            let filter = BlockSpendFilter::from_block(block);
            let filter_hash = filter.filter_hash();

            // check that filter header chain links correctly
            if attestation.attestation.filter_header
                != filter_hash.filter_header(&prev_attestation.attestation.filter_header)
            {
                return Err(Error::FilterHeaderMismatch);
            }
            prev_filter_header = prev_attestation.attestation.filter_header;
        }

        if !attestation.verify(&setup.public_key, self.secp()) {
            return Err(Error::InvalidSignature);
        }
        if attestation.attestation.block_hash != block.block_hash() {
            return Err(Error::Invalid);
        }
        Ok((attestation, prev_filter_header))
    }

    /// Get an attestation for the given block, without checking it
    async fn get_unchecked(
        &self,
        height: u32,
        block_hash: &BlockHash,
    ) -> Result<SignedAttestation, Error>;

    /// Get the oracle configuration
    async fn oracle_setup(&self) -> &OracleSetup;

    /// Get the secp256k1 context
    fn secp(&self) -> &Secp256k1<All>;

    /// Notify the source of a new block, in case the source passively updates
    async fn on_new_block(&self, block_height: u32, block: &Block);
}

/// Standard path for an attestation file
pub fn attestation_path(height: u32, hash: &BlockHash) -> String {
    format!("public/{:07}-{}.sa", height, hash)
}

/// Standard filename for an attestation file
pub fn attestation_filename(height: u32, hash: &BlockHash) -> String {
    format!("{:07}-{}.sa", height, hash)
}

/// A source that reads attestations from a directory
pub struct FileSource {
    setup: OracleSetup,
    datadir: PathBuf,
    secp: Secp256k1<All>,
}

impl FileSource {
    /// Create a new FileSource
    pub fn new(datadir: PathBuf) -> Self {
        info!("Loading oracle setup from {}", datadir.display());
        let setup = read_yaml_from_file(&datadir, "public/config").unwrap_or_else(|| {
            panic!("{} does not exist", datadir.join("public/config").display())
        });
        Self {
            setup,
            datadir,
            secp: Secp256k1::new(),
        }
    }

    /// Get the data directory
    pub fn datadir(&self) -> &PathBuf {
        &self.datadir
    }
}

#[async_trait]
impl Source for FileSource {
    async fn get_unchecked(
        &self,
        height: u32,
        block_hash: &BlockHash,
    ) -> Result<SignedAttestation, Error> {
        let filename = attestation_path(height, block_hash);
        let attestation = read_yaml_from_file(&self.datadir, &filename).ok_or(Error::NotExists)?;
        Ok(attestation)
    }

    async fn oracle_setup(&self) -> &OracleSetup {
        &self.setup
    }

    fn secp(&self) -> &Secp256k1<All> {
        &self.secp
    }

    async fn on_new_block(&self, _block_height: u32, _block: &Block) {}
}

/// Read a YAML file
pub fn read_yaml_from_file<T: DeserializeOwned>(datadir: &PathBuf, name: &str) -> Option<T> {
    let path = datadir.join(name);
    if let Ok(file) = File::open(&path) {
        let reader = BufReader::new(file);
        let data = serde_yaml::from_reader(reader).expect(&format!("cannot parse {}", name));
        Some(data)
    } else {
        None
    }
}

/// Write a YAML file
pub fn write_yaml_to_file<T: Serialize>(datadir: &PathBuf, name: &str, data: T) {
    let path = datadir.join(name);
    let temp_path = path.with_extension("tmp");
    let file = File::create(&temp_path).expect(&format!("cannot create {}", name));
    serde_yaml::to_writer(&file, &data).expect(&format!("cannot write {}", name));
    drop(file);
    fs::rename(temp_path, path).expect(&format!("cannot rename {}", name));
}

/// A source that reads attestations from an HTTP server
pub struct HttpSource {
    setup: OracleSetup,
    secp: Secp256k1<All>,
    client: Client,
    url: Url,
}

impl HttpSource {
    /// Create a new HttpSource
    pub async fn new(mut url: Url) -> Result<Self, Error> {
        // URL might not have a trailing slash, which will cause the last path component to be
        // replaced instead of appended
        if !url.path().ends_with('/') {
            url.set_path(&format!("{}/", url.path()));
        }

        let client = Client::builder()
            .tcp_keepalive(Some(Duration::from_secs(CLIENT_KEEP_ALIVE_SECONDS)))
            .build()
            .expect("client creation");

        info!("Loading oracle setup from {}", url);
        let body = Self::get(&url, "config", &client).await?;
        let setup = serde_yaml::from_reader(body.as_ref()).map_err(|e| {
            warn!("cannot parse config: {}", e);
            Error::Transport
        })?;
        Ok(Self {
            setup,
            secp: Secp256k1::new(),
            client,
            url,
        })
    }

    async fn get(url: &Url, filename: &str, client: &Client) -> Result<Bytes, Error> {
        let res = client
            .get(url.join(filename).expect("url join filename"))
            .send()
            .await
            .map_err(|e| {
                warn!("cannot get {}: {}", url, e);
                Error::Transport
            })?;
        if res.status() != StatusCode::OK {
            warn!("bad status {}: {}", res.status(), url);
            return Err(Error::Transport);
        }
        let s = res.bytes().await.map_err(|e| {
            warn!("cannot read {}: {}", url, e);
            Error::Transport
        })?;
        Ok(s)
    }
}

#[async_trait]
impl Source for HttpSource {
    async fn get_unchecked(
        &self,
        height: u32,
        block_hash: &BlockHash,
    ) -> Result<SignedAttestation, Error> {
        let filename = attestation_filename(height, block_hash);
        let body = Self::get(&self.url, &filename, &self.client).await?;
        let attestation = serde_yaml::from_reader(body.as_ref()).map_err(|e| {
            warn!("cannot parse {}: {}", filename, e);
            Error::Transport
        })?;
        Ok(attestation)
    }

    async fn oracle_setup(&self) -> &OracleSetup {
        &self.setup
    }

    fn secp(&self) -> &Secp256k1<All> {
        &self.secp
    }

    async fn on_new_block(&self, _block_height: u32, _block: &Block) {
        // ignore
    }
}