use std::pin::Pin;
use std::time::Duration;
use futures::prelude::*;
use sawtooth_sdk::messages::batch::Batch;
use sawtooth_sdk::messages::client_batch_submit::{
ClientBatchStatusRequest, ClientBatchStatusResponse, ClientBatchStatusResponse_Status,
ClientBatchSubmitRequest, ClientBatchSubmitResponse, ClientBatchSubmitResponse_Status,
};
use sawtooth_sdk::messages::validator::Message_MessageType;
use sawtooth_sdk::messaging::stream::MessageSender;
use sawtooth_sdk::messaging::zmq_stream::ZmqMessageSender;
use uuid::Uuid;
use super::{
BackendClient, BackendClientError, BatchStatus, BatchStatusLink, BatchStatuses, SubmitBatches,
DEFAULT_TIME_OUT,
};
#[derive(Clone)]
pub struct SawtoothBackendClient {
sender: ZmqMessageSender,
}
impl SawtoothBackendClient {
pub fn new(sender: ZmqMessageSender) -> Self {
Self { sender }
}
}
macro_rules! try_fut {
($try_expr:expr) => {
match $try_expr {
Ok(res) => res,
Err(err) => return futures::future::err(err).boxed(),
}
};
}
impl BackendClient for SawtoothBackendClient {
fn submit_batches(
&self,
msg: SubmitBatches,
) -> Pin<Box<dyn Future<Output = Result<BatchStatusLink, BackendClientError>> + Send>> {
let mut client_submit_request = ClientBatchSubmitRequest::new();
client_submit_request.set_batches(protobuf::RepeatedField::from_vec(
msg.batch_list.get_batches().to_vec(),
));
let response_status: ClientBatchSubmitResponse = try_fut!(query_validator(
&self.sender,
Message_MessageType::CLIENT_BATCH_SUBMIT_REQUEST,
&client_submit_request,
));
future::ready(
process_validator_response(response_status.get_status()).map(|_| {
let batch_query = msg
.batch_list
.get_batches()
.iter()
.map(Batch::get_header_signature)
.collect::<Vec<_>>()
.join(",");
let mut response_url = msg.response_url;
response_url.set_query(Some(&format!("id={}", batch_query)));
BatchStatusLink {
link: response_url.to_string(),
}
}),
)
.boxed()
}
fn batch_status(
&self,
msg: BatchStatuses,
) -> Pin<Box<dyn Future<Output = Result<Vec<BatchStatus>, BackendClientError>> + Send>> {
let mut batch_status_request = ClientBatchStatusRequest::new();
batch_status_request.set_batch_ids(protobuf::RepeatedField::from_vec(msg.batch_ids));
match msg.wait {
Some(wait_time) => {
batch_status_request.set_wait(true);
batch_status_request.set_timeout(wait_time);
}
None => {
batch_status_request.set_wait(false);
}
}
let response_status: ClientBatchStatusResponse = try_fut!(query_validator(
&self.sender,
Message_MessageType::CLIENT_BATCH_STATUS_REQUEST,
&batch_status_request,
));
future::ready(process_batch_status_response(response_status)).boxed()
}
fn clone_box(&self) -> Box<dyn BackendClient> {
Box::new(self.clone())
}
}
pub fn query_validator<T: protobuf::Message, C: protobuf::Message, MS: MessageSender>(
sender: &MS,
message_type: Message_MessageType,
message: &C,
) -> Result<T, BackendClientError> {
let content = protobuf::Message::write_to_bytes(message).map_err(|err| {
BackendClientError::BadRequestError(format!(
"Failed to serialize batch submit request. {}",
err
))
})?;
let correlation_id = Uuid::new_v4().to_string();
let mut response_future = sender
.send(message_type, &correlation_id, &content)
.map_err(|err| {
BackendClientError::ConnectionError(format!(
"Failed to send message to validator. {}",
err
))
})?;
protobuf::Message::parse_from_bytes(
response_future
.get_timeout(Duration::new(DEFAULT_TIME_OUT.into(), 0))
.map_err(|err| BackendClientError::InternalError(err.to_string()))?
.get_content(),
)
.map_err(|err| {
BackendClientError::InternalError(format!(
"Failed to parse validator response from bytes. {}",
err
))
})
}
pub fn process_validator_response(
status: ClientBatchSubmitResponse_Status,
) -> Result<(), BackendClientError> {
match status {
ClientBatchSubmitResponse_Status::OK => Ok(()),
ClientBatchSubmitResponse_Status::INVALID_BATCH => {
Err(BackendClientError::BadRequestError(
"The submitted BatchList was rejected by the validator. It was '
'poorly formed, or has an invalid signature."
.to_string(),
))
}
_ => Err(BackendClientError::InternalError(format!(
"Validator responded with error {:?}",
status
))),
}
}
pub fn process_batch_status_response(
response: ClientBatchStatusResponse,
) -> Result<Vec<BatchStatus>, BackendClientError> {
let status = response.get_status();
match status {
ClientBatchStatusResponse_Status::OK => Ok(response
.get_batch_statuses()
.iter()
.map(BatchStatus::from_proto)
.collect()),
ClientBatchStatusResponse_Status::INVALID_ID => Err(BackendClientError::BadRequestError(
"Blockchain items are identified by 128 character hex-strings. A submitted \
batch id was invalid"
.to_string(),
)),
_ => Err(BackendClientError::InternalError(format!(
"Validator responded with error {:?}",
status
))),
}
}