fabric-sdk 0.4.3

Interact and program chaincode for the Hyperledger Fabric blockchain network
Documentation
use std::{collections::HashMap, sync::Arc};

use futures_channel::mpsc::Receiver;
use futures_util::StreamExt;
use prost::Message;
use tokio::sync::Mutex;

use crate::{
    chaincode::message::MessageBuilder,
    fabric::{
        common::{ChannelHeader, Header, SignatureHeader},
        protos::{
            ChaincodeEvent, ChaincodeMessage, DelState, GetState, GetStateByRange, Proposal,
            PutState, QueryResponse, QueryStateNext, SignedProposal, chaincode_message,
        },
        queryresult::Kv,
    },
};

static UNSPECIFIED_START_KEY: &str = "\u{0001}";

#[derive(Clone)]
pub struct Context {
    pub(crate) message_builder: Arc<Mutex<MessageBuilder>>,
    pub(crate) peer_response_queue: Arc<Mutex<Receiver<ChaincodeMessage>>>,
    pub(crate) message: ChaincodeMessage,
}
impl Context {
    pub(crate) fn new(
        message_builder: Arc<Mutex<MessageBuilder>>,
        message: ChaincodeMessage,
        peer_response_queue: Arc<Mutex<Receiver<ChaincodeMessage>>>,
    ) -> Context {
        Context {
            message_builder,
            message,
            peer_response_queue,
        }
    }

    //Getter

    pub async fn get_state(&self, key: &str) -> Vec<u8> {
        let payload = GetState {
            key: key.to_string(),
            collection: String::new(), //TODO Implement Collection (private write set)
        }
        .encode_to_vec();
        let message_context = self.message.clone();
        self.message_builder
            .lock()
            .await
            .respond(chaincode_message::Type::GetState, payload, message_context)
            .await;
        self.peer_response_queue
            .lock()
            .await
            .next()
            .await
            .expect("[Context] Failed to receive response from channel")
            .payload
    }

    pub async fn get_state_string(&self, key: &str) -> String {
        String::from_utf8(self.get_state(key).await).expect("[Context] Invalid UTF-8 encoding")
    }

    pub async fn get_state_by_range(&self, start_key: &str, end_key: &str) -> RangeResult {
        let start_key = if start_key.is_empty() {
            UNSPECIFIED_START_KEY
        } else {
            start_key
        };

        let payload = GetStateByRange {
            start_key: start_key.to_string(),
            end_key: end_key.to_string(),
            collection: String::new(), //TODO Implement Collection (private write set),
            metadata: vec![],
        }
        .encode_to_vec();

        let message_context = self.message.clone();
        self.message_builder
            .lock()
            .await
            .respond(
                chaincode_message::Type::GetStateByRange,
                payload,
                message_context,
            )
            .await;
        let response = self
            .peer_response_queue
            .lock()
            .await
            .next()
            .await
            .expect("[Context] Failed to receive response from channel");
        let query_response = QueryResponse::decode(response.payload.as_slice())
            .expect("[Context] Invalid query response");
        RangeResult::new(
            self.message_builder.clone(),
            self.peer_response_queue.clone(),
            self.message.clone(),
            query_response,
        )
    }

    //Setter

    pub async fn put_state(&self, key: &str, value: Vec<u8>) {
        let payload = PutState {
            key: key.to_string(),
            value,
            collection: String::new(), //TODO Implement Collection (private write set)
        }
        .encode_to_vec();
        let message_context = self.message.clone();
        self.message_builder
            .lock()
            .await
            .respond(chaincode_message::Type::PutState, payload, message_context)
            .await;
        self.peer_response_queue
            .lock()
            .await
            .next()
            .await
            .expect("[Context] Failed to receive response from channel");
    }

    pub async fn put_state_string(&self, key: &str, value: &str) {
        self.put_state(key, value.as_bytes().to_vec()).await;
    }

    pub async fn del_state(&self, key: &str) {
        let payload = DelState {
            key: key.to_string(),
            collection: String::new(), //TODO Implement Collection (private write set)
        }
        .encode_to_vec();
        let message_context = self.message.clone();
        self.message_builder
            .lock()
            .await
            .respond(chaincode_message::Type::DelState, payload, message_context)
            .await;
        self.peer_response_queue
            .lock()
            .await
            .next()
            .await
            .expect("[Context] Failed to receive response from channel");
    }

    /// Returns the transaction timestamp in seconds.
    pub fn get_tx_timestamp(&self) -> i64 {
        let proposal = Proposal::decode(
            self.message
                .proposal
                .as_ref()
                .expect("No signed proposal")
                .proposal_bytes
                .as_slice(),
        )
        .expect("Invalid proposal bytes");
        let header = Header::decode(proposal.header.as_slice()).expect("Invalid header");
        let channel_header = ChannelHeader::decode(header.channel_header.as_slice())
            .expect("Invalid channel header");
        channel_header.timestamp.expect("No timestamp").seconds
    }

    /// Returns the channel id of the chaincode message. This value is being cloned.
    pub fn get_channel_id(&self) -> String {
        self.message.channel_id.clone()
    }

    /// Returns the transaction id of the chaincode message. This value is being cloned.
    pub fn get_tx_id(&self) -> String {
        self.message.txid.clone()
    }

    /// Returns the signed proposal of the chaincode message. This value is being cloned.
    pub fn get_signed_proposal(&self) -> SignedProposal {
        self.message
            .proposal
            .clone()
            .expect("No signed proposal found")
    }

    /// Returns the chaincode event of the chaincode message. This value is being cloned.
    pub fn get_event(&self) -> Option<ChaincodeEvent> {
        self.message.chaincode_event.clone()
    }

    /// Returns the identity of the agent (or user) submitting the transaction.
    pub fn get_creator(&self) -> Vec<u8> {
        let proposal = Proposal::decode(match self.message.proposal.as_ref() {
            Some(proposal) => proposal.proposal_bytes.as_slice(),
            None => return Vec::new(),
        })
        .expect("Invalid proposal bytes");
        let header = Header::decode(proposal.header.as_slice()).expect("Invalid header");
        let signature_header = SignatureHeader::decode(header.signature_header.as_slice())
            .expect("Invalid signature header");
        signature_header.creator
    }

    /// Returns the transient map of the transaction
    pub fn get_transient_map(&self) -> HashMap<String, Vec<u8>> {
        unimplemented!()
    }
}
pub struct RangeResult {
    message_builder: Arc<Mutex<MessageBuilder>>,
    peer_response_queue: Arc<Mutex<Receiver<ChaincodeMessage>>>,
    message: ChaincodeMessage,
    query_response: QueryResponse,
    results: Vec<Vec<u8>>,
    index: usize,
}
impl RangeResult {
    pub fn new(
        message_builder: Arc<Mutex<MessageBuilder>>,
        peer_response_queue: Arc<Mutex<Receiver<ChaincodeMessage>>>,
        message: ChaincodeMessage,
        query_response: QueryResponse,
    ) -> Self {
        let results = query_response
            .results
            .iter()
            .map(|f| Kv::decode(f.result_bytes.as_slice()).expect("Invalid KV"))
            .map(|f| f.value)
            .collect::<Vec<Vec<u8>>>();
        RangeResult {
            message_builder,
            peer_response_queue,
            message,
            query_response,
            results,
            index: 0,
        }
    }
}
impl Iterator for RangeResult {
    type Item = Vec<u8>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.index < self.results.len() {
            //We still have some results in our buffer
            self.index += 1;
            self.results.get(self.index - 1).cloned()
        } else {
            //our buffer has been iterated through -> check if the node has more
            if self.query_response.has_more {
                //Request more from node
                let payload = QueryStateNext {
                    id: self.query_response.id.clone(),
                }
                .encode_to_vec();
                let message_context = self.message.clone();
                let message_builder = self.message_builder.clone();
                tokio::spawn(async move {
                    message_builder
                        .lock()
                        .await
                        .respond(
                            chaincode_message::Type::QueryStateNext,
                            payload,
                            message_context,
                        )
                        .await;
                });
                loop {
                    match self.peer_response_queue.blocking_lock().try_next() {
                        Ok(Some(response)) => {
                            let query_response = QueryResponse::decode(response.payload.as_slice())
                                .expect("[Context] Invalid query response");
                            self.query_response = query_response;
                            self.index = 1;
                            self.results = self
                                .query_response
                                .results
                                .iter()
                                .map(|f| Kv::decode(f.result_bytes.as_slice()).expect("Invalid KV"))
                                .map(|f| f.value)
                                .collect::<Vec<Vec<u8>>>();
                            return self.results.first().cloned();
                        }
                        Ok(None) => {
                            panic!("[Context] Query Channel is closed")
                        }
                        Err(_) => {}
                    }
                }
            } else {
                None
            }
        }
    }
}