use std::{collections::HashMap, sync::Arc};
use futures_channel::mpsc::Receiver;
use futures_util::StreamExt;
use prost::Message;
use tokio::sync::Mutex;
use crate::{
chaincode::message::MessageBuilder,
fabric::{
common::{ChannelHeader, Header, SignatureHeader},
protos::{
ChaincodeEvent, ChaincodeMessage, DelState, GetState, GetStateByRange, Proposal,
PutState, QueryResponse, QueryStateNext, SignedProposal, chaincode_message,
},
queryresult::Kv,
},
};
static UNSPECIFIED_START_KEY: &str = "\u{0001}";
#[derive(Clone)]
pub struct Context {
pub(crate) message_builder: Arc<Mutex<MessageBuilder>>,
pub(crate) peer_response_queue: Arc<Mutex<Receiver<ChaincodeMessage>>>,
pub(crate) message: ChaincodeMessage,
}
impl Context {
pub(crate) fn new(
message_builder: Arc<Mutex<MessageBuilder>>,
message: ChaincodeMessage,
peer_response_queue: Arc<Mutex<Receiver<ChaincodeMessage>>>,
) -> Context {
Context {
message_builder,
message,
peer_response_queue,
}
}
pub async fn get_state(&self, key: &str) -> Vec<u8> {
let payload = GetState {
key: key.to_string(),
collection: String::new(), }
.encode_to_vec();
let message_context = self.message.clone();
self.message_builder
.lock()
.await
.respond(chaincode_message::Type::GetState, payload, message_context)
.await;
self.peer_response_queue
.lock()
.await
.next()
.await
.expect("[Context] Failed to receive response from channel")
.payload
}
pub async fn get_state_string(&self, key: &str) -> String {
String::from_utf8(self.get_state(key).await).expect("[Context] Invalid UTF-8 encoding")
}
pub async fn get_state_by_range(&self, start_key: &str, end_key: &str) -> RangeResult {
let start_key = if start_key.is_empty() {
UNSPECIFIED_START_KEY
} else {
start_key
};
let payload = GetStateByRange {
start_key: start_key.to_string(),
end_key: end_key.to_string(),
collection: String::new(), metadata: vec![],
}
.encode_to_vec();
let message_context = self.message.clone();
self.message_builder
.lock()
.await
.respond(
chaincode_message::Type::GetStateByRange,
payload,
message_context,
)
.await;
let response = self
.peer_response_queue
.lock()
.await
.next()
.await
.expect("[Context] Failed to receive response from channel");
let query_response = QueryResponse::decode(response.payload.as_slice())
.expect("[Context] Invalid query response");
RangeResult::new(
self.message_builder.clone(),
self.peer_response_queue.clone(),
self.message.clone(),
query_response,
)
}
pub async fn put_state(&self, key: &str, value: Vec<u8>) {
let payload = PutState {
key: key.to_string(),
value,
collection: String::new(), }
.encode_to_vec();
let message_context = self.message.clone();
self.message_builder
.lock()
.await
.respond(chaincode_message::Type::PutState, payload, message_context)
.await;
self.peer_response_queue
.lock()
.await
.next()
.await
.expect("[Context] Failed to receive response from channel");
}
pub async fn put_state_string(&self, key: &str, value: &str) {
self.put_state(key, value.as_bytes().to_vec()).await;
}
pub async fn del_state(&self, key: &str) {
let payload = DelState {
key: key.to_string(),
collection: String::new(), }
.encode_to_vec();
let message_context = self.message.clone();
self.message_builder
.lock()
.await
.respond(chaincode_message::Type::DelState, payload, message_context)
.await;
self.peer_response_queue
.lock()
.await
.next()
.await
.expect("[Context] Failed to receive response from channel");
}
pub fn get_tx_timestamp(&self) -> i64 {
let proposal = Proposal::decode(
self.message
.proposal
.as_ref()
.expect("No signed proposal")
.proposal_bytes
.as_slice(),
)
.expect("Invalid proposal bytes");
let header = Header::decode(proposal.header.as_slice()).expect("Invalid header");
let channel_header = ChannelHeader::decode(header.channel_header.as_slice())
.expect("Invalid channel header");
channel_header.timestamp.expect("No timestamp").seconds
}
pub fn get_channel_id(&self) -> String {
self.message.channel_id.clone()
}
pub fn get_tx_id(&self) -> String {
self.message.txid.clone()
}
pub fn get_signed_proposal(&self) -> SignedProposal {
self.message
.proposal
.clone()
.expect("No signed proposal found")
}
pub fn get_event(&self) -> Option<ChaincodeEvent> {
self.message.chaincode_event.clone()
}
pub fn get_creator(&self) -> Vec<u8> {
let proposal = Proposal::decode(match self.message.proposal.as_ref() {
Some(proposal) => proposal.proposal_bytes.as_slice(),
None => return Vec::new(),
})
.expect("Invalid proposal bytes");
let header = Header::decode(proposal.header.as_slice()).expect("Invalid header");
let signature_header = SignatureHeader::decode(header.signature_header.as_slice())
.expect("Invalid signature header");
signature_header.creator
}
pub fn get_transient_map(&self) -> HashMap<String, Vec<u8>> {
unimplemented!()
}
}
pub struct RangeResult {
message_builder: Arc<Mutex<MessageBuilder>>,
peer_response_queue: Arc<Mutex<Receiver<ChaincodeMessage>>>,
message: ChaincodeMessage,
query_response: QueryResponse,
results: Vec<Vec<u8>>,
index: usize,
}
impl RangeResult {
pub fn new(
message_builder: Arc<Mutex<MessageBuilder>>,
peer_response_queue: Arc<Mutex<Receiver<ChaincodeMessage>>>,
message: ChaincodeMessage,
query_response: QueryResponse,
) -> Self {
let results = query_response
.results
.iter()
.map(|f| Kv::decode(f.result_bytes.as_slice()).expect("Invalid KV"))
.map(|f| f.value)
.collect::<Vec<Vec<u8>>>();
RangeResult {
message_builder,
peer_response_queue,
message,
query_response,
results,
index: 0,
}
}
}
impl Iterator for RangeResult {
type Item = Vec<u8>;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.results.len() {
self.index += 1;
self.results.get(self.index - 1).cloned()
} else {
if self.query_response.has_more {
let payload = QueryStateNext {
id: self.query_response.id.clone(),
}
.encode_to_vec();
let message_context = self.message.clone();
let message_builder = self.message_builder.clone();
tokio::spawn(async move {
message_builder
.lock()
.await
.respond(
chaincode_message::Type::QueryStateNext,
payload,
message_context,
)
.await;
});
loop {
match self.peer_response_queue.blocking_lock().try_next() {
Ok(Some(response)) => {
let query_response = QueryResponse::decode(response.payload.as_slice())
.expect("[Context] Invalid query response");
self.query_response = query_response;
self.index = 1;
self.results = self
.query_response
.results
.iter()
.map(|f| Kv::decode(f.result_bytes.as_slice()).expect("Invalid KV"))
.map(|f| f.value)
.collect::<Vec<Vec<u8>>>();
return self.results.first().cloned();
}
Ok(None) => {
panic!("[Context] Query Channel is closed")
}
Err(_) => {}
}
}
} else {
None
}
}
}
}