use crate::requests::DataRequest::{
GetEpochEndingLedgerInfos, GetNewTransactionOutputsWithProof, GetNewTransactionsWithProof,
GetNumberOfStatesAtVersion, GetServerProtocolVersion, GetStateValuesWithProof,
GetStorageServerSummary, GetTransactionOutputsWithProof, GetTransactionsWithProof,
};
use crate::responses::Error::DegenerateRangeError;
use crate::{Epoch, StorageServiceRequest, COMPRESSION_SUFFIX_LABEL};
use aptos_compression::metrics::CompressionClient;
use aptos_compression::{CompressedData, CompressionError};
use aptos_config::config::StorageServiceConfig;
use aptos_types::epoch_change::EpochChangeProof;
use aptos_types::ledger_info::LedgerInfoWithSignatures;
use aptos_types::state_store::state_value::StateValueChunkWithProof;
use aptos_types::transaction::{TransactionListWithProof, TransactionOutputListWithProof, Version};
use num_traits::{PrimInt, Zero};
#[cfg(test)]
use proptest::prelude::{any, Arbitrary, BoxedStrategy, Strategy};
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::fmt::{Display, Formatter};
use thiserror::Error;
#[derive(Clone, Debug, Deserialize, Error, PartialEq, Serialize)]
pub enum Error {
#[error("Data range cannot be degenerate!")]
DegenerateRangeError,
#[error("Unexpected error encountered: {0}")]
UnexpectedErrorEncountered(String),
#[error("Unexpected response error: {0}")]
UnexpectedResponseError(String),
}
impl From<CompressionError> for Error {
fn from(error: CompressionError) -> Self {
Error::UnexpectedErrorEncountered(error.to_string())
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[allow(clippy::large_enum_variant)]
pub enum StorageServiceResponse {
CompressedResponse(String, CompressedData), RawResponse(DataResponse),
}
impl StorageServiceResponse {
pub fn new(data_response: DataResponse, perform_compression: bool) -> Result<Self, Error> {
if perform_compression {
let raw_data = bcs::to_bytes(&data_response)
.map_err(|error| Error::UnexpectedErrorEncountered(error.to_string()))?;
let compressed_data =
aptos_compression::compress(raw_data, CompressionClient::StateSync)?;
let label = data_response.get_label().to_string() + COMPRESSION_SUFFIX_LABEL;
Ok(StorageServiceResponse::CompressedResponse(
label,
compressed_data,
))
} else {
Ok(StorageServiceResponse::RawResponse(data_response))
}
}
pub fn get_data_response(&self) -> Result<DataResponse, Error> {
match self {
StorageServiceResponse::CompressedResponse(_, compressed_data) => {
let raw_data =
aptos_compression::decompress(compressed_data, CompressionClient::StateSync)?;
let data_response = bcs::from_bytes::<DataResponse>(&raw_data)
.map_err(|error| Error::UnexpectedErrorEncountered(error.to_string()))?;
Ok(data_response)
}
StorageServiceResponse::RawResponse(data_response) => Ok(data_response.clone()),
}
}
pub fn get_label(&self) -> String {
match self {
StorageServiceResponse::CompressedResponse(label, _) => label.clone(),
StorageServiceResponse::RawResponse(data_response) => {
data_response.get_label().to_string()
}
}
}
pub fn is_compressed(&self) -> bool {
matches!(self, Self::CompressedResponse(_, _))
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[allow(clippy::large_enum_variant)]
pub enum DataResponse {
EpochEndingLedgerInfos(EpochChangeProof),
NewTransactionOutputsWithProof((TransactionOutputListWithProof, LedgerInfoWithSignatures)),
NewTransactionsWithProof((TransactionListWithProof, LedgerInfoWithSignatures)),
NumberOfStatesAtVersion(u64),
ServerProtocolVersion(ServerProtocolVersion),
StateValueChunkWithProof(StateValueChunkWithProof),
StorageServerSummary(StorageServerSummary),
TransactionOutputsWithProof(TransactionOutputListWithProof),
TransactionsWithProof(TransactionListWithProof),
}
impl DataResponse {
pub fn get_label(&self) -> &'static str {
match self {
Self::EpochEndingLedgerInfos(_) => "epoch_ending_ledger_infos",
Self::NewTransactionOutputsWithProof(_) => "new_transaction_outputs_with_proof",
Self::NewTransactionsWithProof(_) => "new_transactions_with_proof",
Self::NumberOfStatesAtVersion(_) => "number_of_states_at_version",
Self::ServerProtocolVersion(_) => "server_protocol_version",
Self::StateValueChunkWithProof(_) => "state_value_chunk_with_proof",
Self::StorageServerSummary(_) => "storage_server_summary",
Self::TransactionOutputsWithProof(_) => "transaction_outputs_with_proof",
Self::TransactionsWithProof(_) => "transactions_with_proof",
}
}
}
impl Display for DataResponse {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
let data = match self {
DataResponse::StorageServerSummary(storage_summary) => {
format!("{:?}", storage_summary)
}
_ => "...".into(),
};
write!(
f,
"Storage service response: {}, data: {}",
self.get_label(),
data
)
}
}
impl TryFrom<StorageServiceResponse> for StateValueChunkWithProof {
type Error = crate::responses::Error;
fn try_from(response: StorageServiceResponse) -> crate::Result<Self, Self::Error> {
let data_response = response.get_data_response()?;
match data_response {
DataResponse::StateValueChunkWithProof(inner) => Ok(inner),
_ => Err(Error::UnexpectedResponseError(format!(
"expected state_value_chunk_with_proof, found {}",
data_response.get_label()
))),
}
}
}
impl TryFrom<StorageServiceResponse> for EpochChangeProof {
type Error = crate::responses::Error;
fn try_from(response: StorageServiceResponse) -> crate::Result<Self, Self::Error> {
let data_response = response.get_data_response()?;
match data_response {
DataResponse::EpochEndingLedgerInfos(inner) => Ok(inner),
_ => Err(Error::UnexpectedResponseError(format!(
"expected epoch_ending_ledger_infos, found {}",
data_response.get_label()
))),
}
}
}
impl TryFrom<StorageServiceResponse>
for (TransactionOutputListWithProof, LedgerInfoWithSignatures)
{
type Error = crate::responses::Error;
fn try_from(response: StorageServiceResponse) -> crate::Result<Self, Self::Error> {
let data_response = response.get_data_response()?;
match data_response {
DataResponse::NewTransactionOutputsWithProof(inner) => Ok(inner),
_ => Err(Error::UnexpectedResponseError(format!(
"expected new_transaction_outputs_with_proof, found {}",
data_response.get_label()
))),
}
}
}
impl TryFrom<StorageServiceResponse> for (TransactionListWithProof, LedgerInfoWithSignatures) {
type Error = crate::responses::Error;
fn try_from(response: StorageServiceResponse) -> crate::Result<Self, Self::Error> {
let data_response = response.get_data_response()?;
match data_response {
DataResponse::NewTransactionsWithProof(inner) => Ok(inner),
_ => Err(Error::UnexpectedResponseError(format!(
"expected new_transactions_with_proof, found {}",
data_response.get_label()
))),
}
}
}
impl TryFrom<StorageServiceResponse> for u64 {
type Error = crate::responses::Error;
fn try_from(response: StorageServiceResponse) -> crate::Result<Self, Self::Error> {
let data_response = response.get_data_response()?;
match data_response {
DataResponse::NumberOfStatesAtVersion(inner) => Ok(inner),
_ => Err(Error::UnexpectedResponseError(format!(
"expected number_of_states_at_version, found {}",
data_response.get_label()
))),
}
}
}
impl TryFrom<StorageServiceResponse> for ServerProtocolVersion {
type Error = crate::responses::Error;
fn try_from(response: StorageServiceResponse) -> crate::Result<Self, Self::Error> {
let data_response = response.get_data_response()?;
match data_response {
DataResponse::ServerProtocolVersion(inner) => Ok(inner),
_ => Err(Error::UnexpectedResponseError(format!(
"expected server_protocol_version, found {}",
data_response.get_label()
))),
}
}
}
impl TryFrom<StorageServiceResponse> for StorageServerSummary {
type Error = crate::responses::Error;
fn try_from(response: StorageServiceResponse) -> crate::Result<Self, Self::Error> {
let data_response = response.get_data_response()?;
match data_response {
DataResponse::StorageServerSummary(inner) => Ok(inner),
_ => Err(Error::UnexpectedResponseError(format!(
"expected storage_server_summary, found {}",
data_response.get_label()
))),
}
}
}
impl TryFrom<StorageServiceResponse> for TransactionOutputListWithProof {
type Error = crate::responses::Error;
fn try_from(response: StorageServiceResponse) -> crate::Result<Self, Self::Error> {
let data_response = response.get_data_response()?;
match data_response {
DataResponse::TransactionOutputsWithProof(inner) => Ok(inner),
_ => Err(Error::UnexpectedResponseError(format!(
"expected transaction_outputs_with_proof, found {}",
data_response.get_label()
))),
}
}
}
impl TryFrom<StorageServiceResponse> for TransactionListWithProof {
type Error = crate::responses::Error;
fn try_from(response: StorageServiceResponse) -> crate::Result<Self, Self::Error> {
let data_response = response.get_data_response()?;
match data_response {
DataResponse::TransactionsWithProof(inner) => Ok(inner),
_ => Err(Error::UnexpectedResponseError(format!(
"expected transactions_with_proof, found {}",
data_response.get_label()
))),
}
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct ServerProtocolVersion {
pub protocol_version: u64, }
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct StorageServerSummary {
pub protocol_metadata: ProtocolMetadata,
pub data_summary: DataSummary,
}
impl StorageServerSummary {
pub fn can_service(&self, request: &StorageServiceRequest) -> bool {
self.protocol_metadata.can_service(request) && self.data_summary.can_service(request)
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct ProtocolMetadata {
pub max_epoch_chunk_size: u64, pub max_state_chunk_size: u64, pub max_transaction_chunk_size: u64, pub max_transaction_output_chunk_size: u64, }
impl ProtocolMetadata {
pub fn can_service(&self, request: &StorageServiceRequest) -> bool {
match &request.data_request {
GetNewTransactionsWithProof(_)
| GetNewTransactionOutputsWithProof(_)
| GetNumberOfStatesAtVersion(_)
| GetServerProtocolVersion
| GetStorageServerSummary => true,
GetStateValuesWithProof(request) => CompleteDataRange::new(
request.start_index,
request.end_index,
)
.map_or(false, |range| {
range
.len()
.map_or(false, |chunk_size| self.max_state_chunk_size >= chunk_size)
}),
GetEpochEndingLedgerInfos(request) => CompleteDataRange::new(
request.start_epoch,
request.expected_end_epoch,
)
.map_or(false, |range| {
range
.len()
.map_or(false, |chunk_size| self.max_epoch_chunk_size >= chunk_size)
}),
GetTransactionOutputsWithProof(request) => CompleteDataRange::new(
request.start_version,
request.end_version,
)
.map_or(false, |range| {
range.len().map_or(false, |chunk_size| {
self.max_transaction_output_chunk_size >= chunk_size
})
}),
GetTransactionsWithProof(request) => CompleteDataRange::new(
request.start_version,
request.end_version,
)
.map_or(false, |range| {
range.len().map_or(false, |chunk_size| {
self.max_transaction_chunk_size >= chunk_size
})
}),
}
}
}
impl Default for ProtocolMetadata {
fn default() -> Self {
let config = StorageServiceConfig::default();
Self {
max_epoch_chunk_size: config.max_epoch_chunk_size,
max_transaction_chunk_size: config.max_transaction_chunk_size,
max_transaction_output_chunk_size: config.max_transaction_output_chunk_size,
max_state_chunk_size: config.max_state_chunk_size,
}
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct DataSummary {
pub synced_ledger_info: Option<LedgerInfoWithSignatures>,
pub epoch_ending_ledger_infos: Option<CompleteDataRange<Epoch>>,
pub states: Option<CompleteDataRange<Version>>,
pub transactions: Option<CompleteDataRange<Version>>,
pub transaction_outputs: Option<CompleteDataRange<Version>>,
}
impl DataSummary {
pub fn can_service(&self, request: &StorageServiceRequest) -> bool {
match &request.data_request {
GetNewTransactionsWithProof(_)
| GetNewTransactionOutputsWithProof(_)
| GetServerProtocolVersion
| GetStorageServerSummary => true,
GetEpochEndingLedgerInfos(request) => {
let desired_range =
match CompleteDataRange::new(request.start_epoch, request.expected_end_epoch) {
Ok(desired_range) => desired_range,
Err(_) => return false,
};
self.epoch_ending_ledger_infos
.map(|range| range.superset_of(&desired_range))
.unwrap_or(false)
}
GetNumberOfStatesAtVersion(version) => self
.states
.map(|range| range.contains(*version))
.unwrap_or(false),
GetStateValuesWithProof(request) => {
let proof_version = request.version;
let can_serve_states = self
.states
.map(|range| range.contains(request.version))
.unwrap_or(false);
let can_create_proof = self
.synced_ledger_info
.as_ref()
.map(|li| li.ledger_info().version() >= proof_version)
.unwrap_or(false);
can_serve_states && can_create_proof
}
GetTransactionOutputsWithProof(request) => {
let desired_range =
match CompleteDataRange::new(request.start_version, request.end_version) {
Ok(desired_range) => desired_range,
Err(_) => return false,
};
let can_serve_outputs = self
.transaction_outputs
.map(|range| range.superset_of(&desired_range))
.unwrap_or(false);
let can_create_proof = self
.synced_ledger_info
.as_ref()
.map(|li| li.ledger_info().version() >= request.proof_version)
.unwrap_or(false);
can_serve_outputs && can_create_proof
}
GetTransactionsWithProof(request) => {
let desired_range =
match CompleteDataRange::new(request.start_version, request.end_version) {
Ok(desired_range) => desired_range,
Err(_) => return false,
};
let can_serve_txns = self
.transactions
.map(|range| range.superset_of(&desired_range))
.unwrap_or(false);
let can_create_proof = self
.synced_ledger_info
.as_ref()
.map(|li| li.ledger_info().version() >= request.proof_version)
.unwrap_or(false);
can_serve_txns && can_create_proof
}
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize)]
pub struct CompleteDataRange<T> {
lowest: T,
highest: T,
}
fn range_length_checked<T: PrimInt>(lowest: T, highest: T) -> crate::Result<T, Error> {
highest
.checked_sub(&lowest)
.and_then(|value| value.checked_add(&T::one()))
.ok_or(DegenerateRangeError)
}
impl<T: PrimInt> CompleteDataRange<T> {
pub fn new(lowest: T, highest: T) -> crate::Result<Self, Error> {
if lowest > highest || range_length_checked(lowest, highest).is_err() {
Err(DegenerateRangeError)
} else {
Ok(Self { lowest, highest })
}
}
pub fn from_len(lowest: T, len: T) -> crate::Result<Self, Error> {
let highest = len
.checked_sub(&T::one())
.and_then(|addend| lowest.checked_add(&addend))
.ok_or(DegenerateRangeError)?;
Self::new(lowest, highest)
}
#[inline]
pub fn lowest(&self) -> T {
self.lowest
}
#[inline]
pub fn highest(&self) -> T {
self.highest
}
#[inline]
pub fn len(&self) -> crate::Result<T, Error> {
self.highest
.checked_sub(&self.lowest)
.and_then(|value| value.checked_add(&T::one()))
.ok_or(DegenerateRangeError)
}
pub fn contains(&self, item: T) -> bool {
self.lowest <= item && item <= self.highest
}
pub fn superset_of(&self, other: &Self) -> bool {
self.lowest <= other.lowest && other.highest <= self.highest
}
}
impl<T: Zero> CompleteDataRange<T> {
pub fn from_genesis(highest: T) -> Self {
Self {
lowest: T::zero(),
highest,
}
}
}
impl<'de, T> serde::Deserialize<'de> for CompleteDataRange<T>
where
T: PrimInt + serde::Deserialize<'de>,
{
fn deserialize<D>(deserializer: D) -> crate::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
#[derive(Deserialize)]
#[serde(rename = "CompleteDataRange")]
struct Value<U> {
lowest: U,
highest: U,
}
let value = Value::<T>::deserialize(deserializer)?;
Self::new(value.lowest, value.highest).map_err(D::Error::custom)
}
}
#[cfg(test)]
impl<T> Arbitrary for CompleteDataRange<T>
where
T: PrimInt + Arbitrary + 'static,
{
type Parameters = ();
fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
(any::<T>(), any::<T>())
.prop_filter_map("degenerate range", |(lowest, highest)| {
CompleteDataRange::new(lowest, highest).ok()
})
.boxed()
}
type Strategy = BoxedStrategy<Self>;
}