pub use exonum_api::{Deprecated, EndpointMutability, Error, HttpStatusCode, Result};
use actix_web::{
web::{Bytes, Json},
FromRequest, HttpMessage,
};
use exonum::{
blockchain::{Blockchain, Schema as CoreSchema},
crypto::PublicKey,
merkledb::{access::Prefixed, Snapshot},
runtime::{
ArtifactId, BlockchainData, InstanceDescriptor, InstanceState, InstanceStatus, SnapshotExt,
},
};
use exonum_api::{backends::actix, ApiBackend, ApiBuilder, ApiScope, MovedPermanentlyError};
use exonum_proto::ProtobufConvert;
use futures::prelude::*;
use protobuf::Message;
use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;
use super::Broadcaster;
async fn extract_pb_request<Q>(request: actix::HttpRequest, payload: actix::Payload) -> Result<Q>
where
Q: DeserializeOwned + ProtobufConvert + 'static,
Q::ProtoStruct: Message,
{
match request.content_type() {
"application/json" => Json::from_request(&request, &mut payload.into_inner())
.await
.map(Json::into_inner)
.map_err(|err| {
Error::bad_request()
.title("Cannot read JSON from request body")
.detail(err.to_string())
}),
"application/octet-stream" => {
let bytes = Bytes::from_request(&request, &mut payload.into_inner())
.await
.map_err(|err| {
Error::bad_request()
.title("Cannot read Protobuf from request body")
.detail(err.to_string())
})?;
let mut message = <Q::ProtoStruct as Message>::new();
message.merge_from_bytes(&bytes).map_err(|err| {
Error::bad_request()
.title("Cannot parse Protobuf message")
.detail(err.to_string())
})?;
Q::from_pb(message).map_err(|err| {
Error::bad_request()
.title("Cannot convert Protobuf message")
.detail(err.to_string())
})
}
other => {
let msg = format!(
"Invalid content type: {}. Use `application/json` or `application/octet-stream`",
other
);
Err(Error::bad_request()
.title("Invalid content type")
.detail(msg))
}
}
}
#[derive(Debug)]
pub struct ServiceApiState {
broadcaster: Broadcaster,
snapshot: Box<dyn Snapshot>,
endpoint: String,
status: InstanceStatus,
}
impl ServiceApiState {
fn new<S: Into<String>>(
blockchain: &Blockchain,
instance: InstanceDescriptor,
expected_artifact: &ArtifactId,
endpoint: S,
) -> Result<Self> {
let snapshot = blockchain.snapshot();
let instance_state = snapshot
.for_dispatcher()
.get_instance(instance.id)
.ok_or_else(|| Self::removed_service_error(&instance))?;
Self::check_service_artifact(&instance_state, expected_artifact)?;
let status = instance_state
.status
.ok_or_else(|| Self::removed_service_error(&instance))?;
Ok(Self {
broadcaster: Broadcaster::new(
instance,
blockchain.service_keypair().clone(),
blockchain.sender().clone(),
),
snapshot,
endpoint: endpoint.into(),
status,
})
}
fn removed_service_error(instance: &InstanceDescriptor) -> Error {
let details = format!(
"Service `{}` has been removed from the blockchain services, making it \
impossible to process HTTP handlers",
instance
);
Error::new(HttpStatusCode::INTERNAL_SERVER_ERROR)
.title("Service is gone")
.detail(details)
}
pub fn data(&self) -> BlockchainData<&dyn Snapshot> {
BlockchainData::new(&self.snapshot, &self.instance().name)
}
pub fn service_data(&self) -> Prefixed<&dyn Snapshot> {
self.data().for_executing_service()
}
pub fn snapshot(&self) -> &dyn Snapshot {
&self.snapshot
}
pub fn service_key(&self) -> PublicKey {
self.broadcaster.keypair().public_key()
}
pub fn instance(&self) -> &InstanceDescriptor {
self.broadcaster.instance()
}
pub fn status(&self) -> &InstanceStatus {
&self.status
}
pub fn broadcaster(&self) -> Option<Broadcaster> {
if self.status.is_active() {
CoreSchema::new(&self.snapshot).validator_id(self.service_key())?;
Some(self.broadcaster.clone())
} else {
None
}
}
pub fn generic_broadcaster(&self) -> Broadcaster {
self.broadcaster.clone()
}
pub fn moved_permanently(&self, new_endpoint: &str) -> MovedPermanentlyError {
let new_url = Self::relative_to(&self.endpoint, new_endpoint);
MovedPermanentlyError::new(new_url)
}
fn relative_to(old_endpoint: &str, new_endpoint: &str) -> String {
let endpoint_without_end_slash = old_endpoint.trim_end_matches('/');
let mut nesting_level = endpoint_without_end_slash
.chars()
.filter(|&c| c == '/')
.count();
nesting_level += 1;
let path_to_service_root = "../".repeat(nesting_level);
format!("{}{}", path_to_service_root, new_endpoint)
}
fn check_service_artifact(
instance_state: &InstanceState,
expected_artifact: &ArtifactId,
) -> Result<()> {
let actual_artifact = instance_state.associated_artifact();
if actual_artifact == Some(expected_artifact) {
Ok(())
} else {
let details = format!(
"Service `{}` was upgraded to version {}, making it impossible to continue \
using HTTP handlers from artifact `{}`. Depending on administrative actions, \
the server may be soon rebooted with updated endpoints",
instance_state.spec.as_descriptor(),
instance_state.data_version(),
expected_artifact
);
Err(Error::new(HttpStatusCode::SERVICE_UNAVAILABLE)
.title("Service has been upgraded, but its HTTP handlers are not rebooted yet")
.detail(details))
}
}
}
#[derive(Debug, Clone)]
pub struct ServiceApiScope {
inner: ApiScope,
data: ScopeData,
}
#[derive(Debug, Clone)]
struct ScopeData {
blockchain: Blockchain,
descriptor: InstanceDescriptor,
artifact: ArtifactId,
}
impl ScopeData {
fn wrap<F, I, Q, R>(
&self,
name: &str,
handler: &F,
query: Q,
) -> impl Future<Output = exonum_api::Result<I>>
where
F: Fn(ServiceApiState, Q) -> R + 'static,
R: Future<Output = exonum_api::Result<I>>,
{
let maybe_state = ServiceApiState::new(
&self.blockchain,
self.descriptor.clone(),
&self.artifact,
name,
);
let state = match maybe_state {
Ok(state) => state,
Err(err) => return future::err(err).left_future(),
};
let descriptor = self.descriptor.clone();
handler(state, query)
.map_err(move |err| err.source(descriptor.to_string()))
.right_future()
}
}
impl ServiceApiScope {
fn new(blockchain: Blockchain, descriptor: InstanceDescriptor, artifact: ArtifactId) -> Self {
Self {
inner: ApiScope::new(),
data: ScopeData {
blockchain,
descriptor,
artifact,
},
}
}
pub fn endpoint<Q, I, F, R>(&mut self, name: &'static str, handler: F) -> &mut Self
where
Q: DeserializeOwned + 'static + Send,
I: Serialize + 'static,
F: Fn(ServiceApiState, Q) -> R + 'static + Clone + Send + Sync,
R: Future<Output = exonum_api::Result<I>>,
{
let data = self.data.clone();
self.inner
.endpoint(name, move |query: Q| data.wrap(name, &handler, query));
self
}
pub fn endpoint_mut<Q, I, F, R>(&mut self, name: &'static str, handler: F) -> &mut Self
where
Q: DeserializeOwned + 'static,
I: Serialize + 'static,
F: Fn(ServiceApiState, Q) -> R + 'static + Clone + Send + Sync,
R: Future<Output = exonum_api::Result<I>>,
{
let data = self.data.clone();
self.inner
.endpoint_mut(name, move |query: Q| data.wrap(name, &handler, query));
self
}
pub fn pb_endpoint_mut<Q, I, F, R>(&mut self, name: &'static str, handler: F) -> &mut Self
where
Q: DeserializeOwned + ProtobufConvert + 'static,
Q::ProtoStruct: Message,
I: Serialize + 'static,
F: Fn(ServiceApiState, Q) -> R + 'static + Clone + Send + Sync,
R: Future<Output = exonum_api::Result<I>>,
{
let data = self.data.clone();
let raw_handler = move |http_request, payload| {
let data = data.clone();
let handler = handler.clone();
async move {
let query: Q = extract_pb_request(http_request, payload).await?;
let response = data.wrap(name, &handler, query).await?;
Ok(actix::HttpResponse::Ok().json(response))
}
.boxed_local()
};
let raw_handler = actix::RequestHandler {
name: name.to_owned(),
method: actix::HttpMethod::POST,
inner: Arc::new(raw_handler),
};
self.inner.web_backend().raw_handler(raw_handler);
self
}
pub fn deprecated_endpoint<Q, I, F, R>(
&mut self,
name: &'static str,
deprecated: Deprecated<Q, I, R, F>,
) -> &mut Self
where
Q: DeserializeOwned + 'static,
I: Serialize + 'static,
F: Fn(ServiceApiState, Q) -> R + 'static + Clone + Send + Sync,
R: Future<Output = exonum_api::Result<I>>,
{
let data = self.data.clone();
let handler = deprecated.handler.clone();
let full_handler = move |query: Q| data.wrap(name, &handler, query);
let handler = deprecated.with_different_handler(full_handler);
self.inner.endpoint(name, handler);
self
}
pub fn deprecated_endpoint_mut<Q, I, F, R>(
&mut self,
name: &'static str,
deprecated: Deprecated<Q, I, R, F>,
) -> &mut Self
where
Q: DeserializeOwned + 'static,
I: Serialize + 'static,
F: Fn(ServiceApiState, Q) -> R + 'static + Clone + Send + Sync,
R: Future<Output = exonum_api::Result<I>>,
{
let data = self.data.clone();
let handler = deprecated.handler.clone();
let full_handler = move |query: Q| data.wrap(name, &handler, query);
let handler = deprecated.with_different_handler(full_handler);
self.inner.endpoint_mut(name, handler);
self
}
pub fn web_backend(&mut self) -> &mut actix::ApiBuilder {
self.inner.web_backend()
}
}
#[derive(Debug)]
pub struct ServiceApiBuilder {
blockchain: Blockchain,
public_scope: ServiceApiScope,
private_scope: ServiceApiScope,
root_path: Option<String>,
}
impl ServiceApiBuilder {
pub(crate) fn new(
blockchain: Blockchain,
instance: InstanceDescriptor,
artifact: ArtifactId,
) -> Self {
Self {
blockchain: blockchain.clone(),
public_scope: ServiceApiScope::new(
blockchain.clone(),
instance.clone(),
artifact.clone(),
),
private_scope: ServiceApiScope::new(blockchain, instance, artifact),
root_path: None,
}
}
pub fn public_scope(&mut self) -> &mut ServiceApiScope {
&mut self.public_scope
}
pub fn private_scope(&mut self) -> &mut ServiceApiScope {
&mut self.private_scope
}
pub fn blockchain(&self) -> &Blockchain {
&self.blockchain
}
#[doc(hidden)]
pub fn with_root_path(&mut self, root_path: impl Into<String>) -> &mut Self {
let root_path = root_path.into();
self.root_path = Some(root_path);
self
}
pub(super) fn take_root_path(&mut self) -> Option<String> {
self.root_path.take()
}
}
impl From<ServiceApiBuilder> for ApiBuilder {
fn from(inner: ServiceApiBuilder) -> Self {
Self {
public_scope: inner.public_scope.inner,
private_scope: inner.private_scope.inner,
}
}
}