bpm_core 0.1.0

Cross-platform software helping to verify, prove and secure package compilation, building, fetching, and deployment. Core package.
Documentation
use crate::blockchains::blockchain::{BlockchainClient, BlockchainIO};
use crate::blockchains::errors::blockchain_error::BlockchainError;
use std::convert::TryFrom;
use std::{env, str::FromStr, sync::Arc, time::Duration};

use futures_util::TryStreamExt;
use hedera::{AccountId, Client, PrivateKey, TopicId, TopicMessageSubmitTransaction};
pub mod hedera_mirror {
    tonic::include_proto!("mirror");
}

use hedera_mirror::{
    com::hedera::mirror::api::proto::{
        consensus_service_client::ConsensusServiceClient, ConsensusTopicQuery,
        ConsensusTopicResponse,
    },
    proto::{Timestamp, TopicId as MirrorTopicId},
};

use tokio::sync::{mpsc::Sender, Mutex};

use log::{debug, trace};
use tonic::{
    transport::{Channel, ClientTlsConfig},
    Streaming,
};

struct HederaBlockchainIO {
    network: String,
    last_sync: u64,
    packages_topic: TopicId,
    hedera_client: Client,
}

impl HederaBlockchainIO {
    /**
     * Create new gRPC channel to HCS
     */
    async fn new_channel(&self) -> Result<Channel, BlockchainError> {
        debug!("Establishing new HCS channel...");
        let tls = ClientTlsConfig::new().with_native_roots();

        let remote_url = format!("https://{}", self.network.to_string()); // We must prefix scheme

        let channel = Channel::from_shared(remote_url)
            .map_err(|_| BlockchainError::ConnectionConfig)?
            .tls_config(tls)
            .map_err(|_| BlockchainError::ConnectionConfig)?
            .connect()
            .await
            .map_err(|_| BlockchainError::ConnectionFailure)?;

        debug!("Done establishing new HCS channel !");

        Ok(channel)
    }

    /**
     * Subscribe to topic then return associated stream
     */
    async fn new_topic_subscription(
        &self,
        topic: TopicId,
        start_timestamp: u64,
    ) -> Result<Streaming<ConsensusTopicResponse>, BlockchainError> {
        debug!("Creating new topic subscription...");

        let query = ConsensusTopicQuery {
            topic_id: Some(MirrorTopicId {
                realm_num: i64::try_from(topic.realm)
                    .expect("Could not convert topic realm to i64"),
                shard_num: i64::try_from(topic.shard).expect("Could not convert shard to i64"),
                topic_num: i64::try_from(topic.num).expect("Could not convert topic num to i64"),
            }),
            consensus_start_time: Some(Timestamp {
                nanos: 0,
                seconds: i64::try_from(start_timestamp)
                    .expect("Could not convert start time seconds to i64"),
            }),
            consensus_end_time: None,
            limit: 0,
        };

        let reading_channel = self.new_channel().await?;

        let mut mirror_client = ConsensusServiceClient::new(reading_channel.clone());

        const TIMEOUT: u64 = 1;
        let response = tokio::time::timeout(
            Duration::from_secs(TIMEOUT),
            mirror_client.subscribe_topic(query),
        )
        .await
        .map_err(|_| BlockchainError::NoPackagesData)?;

        let stream = response.unwrap().into_inner();
        debug!("Done creating new topic subscription !");
        Ok(stream)
    }
}

#[async_trait::async_trait]
impl BlockchainIO for HederaBlockchainIO {
    /**
     * Write to HCS
     */
    async fn write(&self, data: &[u8]) {
        TopicMessageSubmitTransaction::new()
            .topic_id(self.packages_topic)
            .message(data)
            .execute(&self.hedera_client)
            .await
            .unwrap();
    }

    /**
     * Read from HCS
     */
    async fn read(&self, tx_data: &Sender<Result<Vec<u8>, BlockchainError>>) {
        let stream_res = self
            .new_topic_subscription(self.packages_topic, self.last_sync)
            .await;

        let mut stream = match stream_res {
            Ok(stream) => stream,
            Err(e) => {
                tx_data.send(Err(e)).await.unwrap();
                return ();
            }
        };

        const NEXT_MESSAGE_TIMEOUT: u64 = 1;

        while let Ok(result) =
            tokio::time::timeout(Duration::from_secs(NEXT_MESSAGE_TIMEOUT), stream.try_next()).await
        {
            trace!("Sending to channel...");
            let response = result.unwrap().unwrap();

            let buf: Vec<u8> = Vec::from(response.message.as_slice());

            tx_data.send(Ok(buf)).await.unwrap();
            trace!("Done sending to channel !");
        }
    }
}

#[derive(Debug, Clone)]
pub struct HederaBlockchain {
    blockchain_client: Client,
    packages_topic: TopicId,
    last_sync: Arc<Mutex<u64>>,
}

#[async_trait::async_trait]
impl BlockchainClient for HederaBlockchain {
    /**
     * Get HCS network
     */
    fn get_network(&self) -> String {
        let networks = self.blockchain_client.mirror_network();

        let network = String::from(networks.first().unwrap());

        network
    }

    /**
     * Get blockchain label
     */
    fn get_label(&self) -> String {
        String::from("hedera")
    }

    /**
     * Create HCS IO
     */
    async fn create_io(&self) -> Box<dyn BlockchainIO> {
        // Hedera IO is short lived so it's fine to deref
        let last_sync = *self.last_sync.lock().await;
        let hedera_io = HederaBlockchainIO {
            network: self.get_network(),
            last_sync,
            packages_topic: self.packages_topic,
            hedera_client: self.blockchain_client.clone(),
        };

        Box::new(hedera_io)
    }

    /**
     * Get last sync
     */
    async fn get_last_sync(&self) -> u64 {
        let last_sync = self.last_sync.lock().await;
        *last_sync
    }

    /**
     * Set last sync
     */
    async fn set_last_sync(&self, last_sync: u64) {
        let mut last_sync_lock = self.last_sync.lock().await;

        *last_sync_lock = last_sync;
    }
}

impl From<&str> for HederaBlockchain {
    /**
     * Build from HCS topic ID
     */
    fn from(package_topic_id: &str) -> Self {
        debug!("Creating Hedera Blockchain Client using default parameters...");

        let blockchain_client = Client::for_testnet();

        // TODO : temporary, use config manager
        let debug_account = env::var("BPM_ACCOUNT").unwrap_or(String::from(""));
        let debug_key = env::var("BPM_KEY").unwrap_or(String::from(""));

        if debug_account != "" && debug_key != "" {
            let account_id = AccountId::from_str(debug_account.as_str()).unwrap();
            let private_key = PrivateKey::from_str(debug_key.as_str()).unwrap();
            blockchain_client.set_operator(account_id, private_key);
        }

        let topic = TopicId::from_str(&package_topic_id).unwrap();

        let client = Self {
            blockchain_client,
            packages_topic: topic,
            last_sync: Arc::new(Mutex::new(0)),
        };

        let net_addr = client
            .blockchain_client
            .mirror_network()
            .first()
            .unwrap()
            .to_string();

        debug!(
            "Done creating Hedera Blockchain Client using network address : {} !",
            net_addr
        );

        client
    }
}