use actix::Arbiter;
use actix_web::{http, ws};
use chrono::{DateTime, Utc};
use futures::IntoFuture;
use std::ops::Range;
use std::sync::{Arc, Mutex};
use std::time::UNIX_EPOCH;
use crate::{
api::{
backends::actix::{
self as actix_backend, FutureResponse, HttpRequest, RawHandler, RequestHandler,
},
websocket::{Server, Session},
Error as ApiError, ServiceApiBackend, ServiceApiScope, ServiceApiState,
},
blockchain::{Block, SharedNodeState},
crypto::Hash,
explorer::{self, BlockchainExplorer, TransactionInfo},
helpers::Height,
messages::{Message, Precommit, RawTransaction, Signed, SignedMessage},
};
pub const MAX_BLOCKS_PER_REQUEST: usize = 1000;
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct BlocksRange {
pub range: Range<Height>,
pub blocks: Vec<Block>,
pub times: Option<Vec<DateTime<Utc>>>,
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct BlockInfo {
pub block: Block,
pub precommits: Vec<Signed<Precommit>>,
pub txs: Vec<Hash>,
pub time: DateTime<Utc>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Default)]
pub struct BlocksQuery {
pub count: usize,
pub latest: Option<Height>,
#[serde(default)]
pub skip_empty_blocks: bool,
#[serde(default)]
pub add_blocks_time: bool,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub struct BlockQuery {
pub height: Height,
}
impl BlockQuery {
pub fn new(height: Height) -> Self {
Self { height }
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TransactionHex {
pub tx_body: String,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub struct TransactionResponse {
pub tx_hash: Hash,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
pub struct TransactionQuery {
pub hash: Hash,
}
impl TransactionQuery {
pub fn new(hash: Hash) -> Self {
Self { hash }
}
}
#[derive(Debug, Clone, Copy)]
pub struct ExplorerApi;
impl ExplorerApi {
pub fn blocks(state: &ServiceApiState, query: BlocksQuery) -> Result<BlocksRange, ApiError> {
let explorer = BlockchainExplorer::new(state.blockchain());
if query.count > MAX_BLOCKS_PER_REQUEST {
return Err(ApiError::BadRequest(format!(
"Max block count per request exceeded ({})",
MAX_BLOCKS_PER_REQUEST
)));
}
let (upper, blocks_iter) = if let Some(upper) = query.latest {
(upper, explorer.blocks(..upper.next()))
} else {
(explorer.height(), explorer.blocks(..))
};
let mut times = Vec::new();
let blocks: Vec<_> = blocks_iter
.rev()
.filter(|block| !query.skip_empty_blocks || !block.is_empty())
.take(query.count)
.inspect(|block| {
if query.add_blocks_time {
times.push(median_precommits_time(&block.precommits()));
}
})
.map(|block| block.into_header())
.collect();
let height = if blocks.len() < query.count {
Height(0)
} else {
blocks.last().map_or(Height(0), |block| block.height())
};
Ok(BlocksRange {
range: height..upper.next(),
blocks,
times: if query.add_blocks_time {
Some(times)
} else {
None
},
})
}
pub fn block(state: &ServiceApiState, query: BlockQuery) -> Result<BlockInfo, ApiError> {
BlockchainExplorer::new(state.blockchain())
.block(query.height)
.map(From::from)
.ok_or_else(|| {
ApiError::NotFound(format!("Block for height: {} not found", query.height))
})
}
pub fn transaction_info(
state: &ServiceApiState,
query: TransactionQuery,
) -> Result<TransactionInfo, ApiError> {
BlockchainExplorer::new(state.blockchain())
.transaction(&query.hash)
.ok_or_else(|| {
let description = serde_json::to_string(&json!({ "type": "unknown" })).unwrap();
debug!("{}", description);
ApiError::NotFound(description)
})
}
pub fn add_transaction(
state: &ServiceApiState,
query: TransactionHex,
) -> Result<TransactionResponse, ApiError> {
use crate::events::error::into_failure;
use crate::messages::ProtocolMessage;
let buf: Vec<u8> = ::hex::decode(query.tx_body).map_err(into_failure)?;
let signed = SignedMessage::from_raw_buffer(buf)?;
let tx_hash = signed.hash();
let signed = RawTransaction::try_from(Message::deserialize(signed)?)
.map_err(|_| format_err!("Couldn't deserialize transaction message."))?;
let _ = state
.sender()
.broadcast_transaction(signed)
.map_err(ApiError::from);
Ok(TransactionResponse { tx_hash })
}
pub fn handle_subscribe(
name: &'static str,
backend: &mut actix_backend::ApiBuilder,
service_api_state: ServiceApiState,
shared_node_state: SharedNodeState,
) {
let server = Arc::new(Mutex::new(None));
let service_api_state = Arc::new(service_api_state);
let index = move |req: HttpRequest| -> FutureResponse {
let server = server.clone();
let service_api_state = service_api_state.clone();
let mut address = server.lock().expect("Expected mutex lock");
if address.is_none() {
*address = Some(Arbiter::start(|_| Server::new(service_api_state)));
shared_node_state.set_broadcast_server_address(address.to_owned().unwrap());
}
Box::new(ws::start(&req, Session::new(address.to_owned().unwrap())).into_future())
};
backend.raw_handler(RequestHandler {
name: name.to_owned(),
method: http::Method::GET,
inner: Arc::from(index) as Arc<RawHandler>,
});
}
pub fn wire(
api_scope: &mut ServiceApiScope,
service_api_state: ServiceApiState,
shared_node_state: SharedNodeState,
) -> &mut ServiceApiScope {
Self::handle_subscribe(
"v1/blocks/subscribe",
api_scope.web_backend(),
service_api_state,
shared_node_state,
);
api_scope
.endpoint("v1/blocks", Self::blocks)
.endpoint("v1/block", Self::block)
.endpoint("v1/transactions", Self::transaction_info)
.endpoint_mut("v1/transactions", Self::add_transaction)
}
}
impl<'a> From<explorer::BlockInfo<'a>> for BlockInfo {
fn from(inner: explorer::BlockInfo<'a>) -> Self {
Self {
block: inner.header().clone(),
precommits: inner.precommits().to_vec(),
txs: inner.transaction_hashes().to_vec(),
time: median_precommits_time(&inner.precommits()),
}
}
}
fn median_precommits_time(precommits: &[Signed<Precommit>]) -> DateTime<Utc> {
if precommits.is_empty() {
UNIX_EPOCH.into()
} else {
let mut times: Vec<_> = precommits.iter().map(|p| p.time()).collect();
times.sort();
times[times.len() / 2]
}
}