use crate::{
data_notification::{
DataClientRequest,
DataClientRequest::{
EpochEndingLedgerInfos, NewTransactionOutputsWithProof, NewTransactionsWithProof,
NumberOfStates, StateValuesWithProof, TransactionOutputsWithProof,
TransactionsWithProof,
},
DataNotification, DataPayload, EpochEndingLedgerInfosRequest,
NewTransactionOutputsWithProofRequest, NewTransactionsWithProofRequest,
NumberOfStatesRequest, StateValuesWithProofRequest, TransactionOutputsWithProofRequest,
TransactionsWithProofRequest,
},
error::Error,
logging::{LogEntry, LogEvent, LogSchema},
streaming_client::{
Epoch, GetAllEpochEndingLedgerInfosRequest, GetAllStatesRequest, StreamRequest,
},
};
use aptos_data_client::{AdvertisedData, GlobalDataSummary, ResponsePayload};
use aptos_id_generator::{IdGenerator, U64IdGenerator};
use aptos_logger::prelude::*;
use aptos_types::{ledger_info::LedgerInfoWithSignatures, transaction::Version};
use enum_dispatch::enum_dispatch;
use std::{cmp, sync::Arc};
macro_rules! invalid_client_request {
($client_request:expr, $stream_engine:expr) => {
panic!(
"Invalid client request {:?} found for the data stream engine {:?}",
$client_request, $stream_engine
)
};
}
macro_rules! invalid_response_type {
($client_response:expr) => {
panic!(
"The client response is type mismatched: {:?}",
$client_response
)
};
}
macro_rules! invalid_stream_request {
($stream_request:expr) => {
panic!(
"Invalid stream request found {:?}",
format!("{:?}", $stream_request)
)
};
}
#[enum_dispatch]
pub trait DataStreamEngine {
fn create_data_client_requests(
&mut self,
max_number_of_requests: u64,
global_data_summary: &GlobalDataSummary,
) -> Result<Vec<DataClientRequest>, Error>;
fn is_remaining_data_available(&self, advertised_data: &AdvertisedData) -> bool;
fn is_stream_complete(&self) -> bool;
fn transform_client_response_into_notification(
&mut self,
client_request: &DataClientRequest,
client_response_payload: ResponsePayload,
notification_id_generator: Arc<U64IdGenerator>,
) -> Result<Option<DataNotification>, Error>;
}
#[enum_dispatch(DataStreamEngine)]
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum StreamEngine {
ContinuousTransactionStreamEngine,
EpochEndingStreamEngine,
StateStreamEngine,
TransactionStreamEngine,
}
impl StreamEngine {
pub fn new(
stream_request: &StreamRequest,
advertised_data: &AdvertisedData,
) -> Result<Self, Error> {
match stream_request {
StreamRequest::ContinuouslyStreamTransactionOutputs(_) => {
Ok(ContinuousTransactionStreamEngine::new(stream_request)?.into())
}
StreamRequest::ContinuouslyStreamTransactions(_) => {
Ok(ContinuousTransactionStreamEngine::new(stream_request)?.into())
}
StreamRequest::GetAllStates(request) => Ok(StateStreamEngine::new(request)?.into()),
StreamRequest::GetAllEpochEndingLedgerInfos(request) => {
Ok(EpochEndingStreamEngine::new(request, advertised_data)?.into())
}
StreamRequest::GetAllTransactionOutputs(_) => {
Ok(TransactionStreamEngine::new(stream_request)?.into())
}
StreamRequest::GetAllTransactions(_) => {
Ok(TransactionStreamEngine::new(stream_request)?.into())
}
_ => Err(Error::UnsupportedRequestEncountered(format!(
"Stream request not supported: {:?}",
stream_request
))),
}
}
}
#[derive(Clone, Debug)]
pub struct StateStreamEngine {
pub request: GetAllStatesRequest,
pub state_num_requested: bool,
pub number_of_states: Option<u64>,
pub next_stream_index: u64,
pub next_request_index: u64,
pub stream_is_complete: bool,
}
impl StateStreamEngine {
fn new(request: &GetAllStatesRequest) -> Result<Self, Error> {
Ok(StateStreamEngine {
request: request.clone(),
state_num_requested: false,
number_of_states: None,
next_stream_index: request.start_index,
next_request_index: request.start_index,
stream_is_complete: false,
})
}
fn update_request_tracking(
&mut self,
client_requests: &[DataClientRequest],
) -> Result<(), Error> {
for client_request in client_requests {
match client_request {
StateValuesWithProof(request) => {
self.next_request_index =
request.end_index.checked_add(1).ok_or_else(|| {
Error::IntegerOverflow("Next request index has overflown!".into())
})?;
}
request => invalid_client_request!(request, self),
}
}
Ok(())
}
fn get_number_of_states(&self) -> u64 {
self.number_of_states
.expect("Number of states is not initialized!")
}
}
impl DataStreamEngine for StateStreamEngine {
fn create_data_client_requests(
&mut self,
max_number_of_requests: u64,
global_data_summary: &GlobalDataSummary,
) -> Result<Vec<DataClientRequest>, Error> {
if self.number_of_states.is_none() && self.state_num_requested {
return Ok(vec![]); }
if let Some(number_of_states) = self.number_of_states {
let end_state_index = number_of_states
.checked_sub(1)
.ok_or_else(|| Error::IntegerOverflow("End state index has overflown!".into()))?;
let client_requests = create_data_client_requests(
self.next_request_index,
end_state_index,
max_number_of_requests,
global_data_summary.optimal_chunk_sizes.state_chunk_size,
self.clone().into(),
)?;
self.update_request_tracking(&client_requests)?;
Ok(client_requests)
} else {
info!(
(LogSchema::new(LogEntry::AptosDataClient)
.event(LogEvent::Pending)
.message(&format!(
"Requested the number of states at version: {:?}",
self.request.version
)))
);
self.state_num_requested = true;
Ok(vec![DataClientRequest::NumberOfStates(
NumberOfStatesRequest {
version: self.request.version,
},
)])
}
}
fn is_remaining_data_available(&self, advertised_data: &AdvertisedData) -> bool {
AdvertisedData::contains_range(
self.request.version,
self.request.version,
&advertised_data.states,
)
}
fn is_stream_complete(&self) -> bool {
self.stream_is_complete
}
fn transform_client_response_into_notification(
&mut self,
client_request: &DataClientRequest,
client_response_payload: ResponsePayload,
notification_id_generator: Arc<U64IdGenerator>,
) -> Result<Option<DataNotification>, Error> {
match client_request {
StateValuesWithProof(request) => {
verify_client_request_indices(
self.next_stream_index,
request.start_index,
request.end_index,
);
self.next_stream_index = request.end_index.checked_add(1).ok_or_else(|| {
Error::IntegerOverflow("Next stream index has overflown!".into())
})?;
if request.end_index
== self
.get_number_of_states()
.checked_sub(1)
.ok_or_else(|| Error::IntegerOverflow("End index has overflown!".into()))?
{
self.stream_is_complete = true;
}
let data_notification = create_data_notification(
notification_id_generator,
client_response_payload,
None,
self.clone().into(),
);
return Ok(Some(data_notification));
}
NumberOfStates(request) => {
if let ResponsePayload::NumberOfStates(number_of_states) = client_response_payload {
info!(
(LogSchema::new(LogEntry::ReceivedDataResponse)
.event(LogEvent::Success)
.message(&format!(
"Received number of states at version: {:?}. Total states: {:?}",
request.version, number_of_states
)))
);
self.state_num_requested = false;
if number_of_states < self.next_request_index {
return Err(Error::NoDataToFetch(format!(
"The next state index to fetch is higher than the \
total number of states. Next index: {:?}, total states: {:?}",
self.next_request_index, number_of_states
)));
} else {
self.number_of_states = Some(number_of_states);
}
}
}
request => invalid_client_request!(request, self),
}
Ok(None)
}
}
#[derive(Clone, Debug)]
pub struct ContinuousTransactionStreamEngine {
pub request: StreamRequest,
pub current_target_ledger_info: Option<LedgerInfoWithSignatures>,
pub end_of_epoch_requested: bool,
pub subscription_requested: bool,
pub next_stream_version_and_epoch: (Version, Epoch),
pub next_request_version_and_epoch: (Version, Epoch),
pub stream_is_complete: bool,
}
impl ContinuousTransactionStreamEngine {
fn new(stream_request: &StreamRequest) -> Result<Self, Error> {
match stream_request {
StreamRequest::ContinuouslyStreamTransactions(request) => {
let (next_version, next_epoch) = Self::calculate_next_version_and_epoch(
request.known_version,
request.known_epoch,
)?;
Ok(ContinuousTransactionStreamEngine {
request: stream_request.clone(),
current_target_ledger_info: None,
end_of_epoch_requested: false,
subscription_requested: false,
next_stream_version_and_epoch: (next_version, next_epoch),
next_request_version_and_epoch: (next_version, next_epoch),
stream_is_complete: false,
})
}
StreamRequest::ContinuouslyStreamTransactionOutputs(request) => {
let (next_version, next_epoch) = Self::calculate_next_version_and_epoch(
request.known_version,
request.known_epoch,
)?;
Ok(ContinuousTransactionStreamEngine {
request: stream_request.clone(),
current_target_ledger_info: None,
end_of_epoch_requested: false,
subscription_requested: false,
next_stream_version_and_epoch: (next_version, next_epoch),
next_request_version_and_epoch: (next_version, next_epoch),
stream_is_complete: false,
})
}
request => invalid_stream_request!(request),
}
}
fn calculate_next_version_and_epoch(
known_version: Version,
known_epoch: Epoch,
) -> Result<(Version, Epoch), Error> {
let next_version = known_version
.checked_add(1)
.ok_or_else(|| Error::IntegerOverflow("Next version has overflown!".into()))?;
Ok((next_version, known_epoch))
}
fn select_target_ledger_info(
&self,
advertised_data: &AdvertisedData,
) -> Result<Option<LedgerInfoWithSignatures>, Error> {
match &self.request {
StreamRequest::ContinuouslyStreamTransactions(request) => {
if let Some(target) = &request.target {
return Ok(Some(target.clone()));
}
}
StreamRequest::ContinuouslyStreamTransactionOutputs(request) => {
if let Some(target) = &request.target {
return Ok(Some(target.clone()));
}
}
request => invalid_stream_request!(request),
};
if let Some(highest_synced_ledger_info) = advertised_data.highest_synced_ledger_info() {
let (next_request_version, _) = self.next_request_version_and_epoch;
if next_request_version > highest_synced_ledger_info.ledger_info().version() {
Ok(None) } else {
Ok(Some(highest_synced_ledger_info))
}
} else {
Err(Error::DataIsUnavailable(
"Unable to find the highest synced ledger info!".into(),
))
}
}
fn get_target_ledger_info(&self) -> &LedgerInfoWithSignatures {
self.current_target_ledger_info
.as_ref()
.expect("No current target ledger info found!")
}
fn create_notification_for_continuous_data(
&mut self,
request_start: Version,
request_end: Version,
client_response_payload: ResponsePayload,
notification_id_generator: Arc<U64IdGenerator>,
) -> Result<DataNotification, Error> {
let target_ledger_info = self.get_target_ledger_info().clone();
self.update_stream_version_and_epoch(request_start, request_end, &target_ledger_info)?;
let data_notification = create_data_notification(
notification_id_generator,
client_response_payload,
Some(target_ledger_info),
self.clone().into(),
);
Ok(data_notification)
}
fn create_notification_for_subscription_data(
&mut self,
known_version: Version,
client_response_payload: ResponsePayload,
notification_id_generator: Arc<U64IdGenerator>,
) -> Result<DataNotification, Error> {
let first_version = known_version
.checked_add(1)
.ok_or_else(|| Error::IntegerOverflow("First version has overflown!".into()))?;
let (num_versions, target_ledger_info) = match &client_response_payload {
ResponsePayload::NewTransactionsWithProof((
transactions_with_proof,
target_ledger_info,
)) => (
transactions_with_proof.transactions.len(),
target_ledger_info.clone(),
),
ResponsePayload::NewTransactionOutputsWithProof((
outputs_with_proof,
target_ledger_info,
)) => (
outputs_with_proof.transactions_and_outputs.len(),
target_ledger_info.clone(),
),
response_payload => {
return Err(Error::AptosDataClientResponseIsInvalid(format!(
"Expected new transactions or outputs but got: {:?}",
response_payload
)));
}
};
if num_versions == 0 {
return Err(Error::AptosDataClientResponseIsInvalid(
"Received an empty transaction or output list!".into(),
));
}
let last_version = known_version
.checked_add(num_versions as u64)
.ok_or_else(|| Error::IntegerOverflow("Last version has overflown!".into()))?;
self.update_request_version_and_epoch(last_version, &target_ledger_info)?;
self.update_stream_version_and_epoch(first_version, last_version, &target_ledger_info)?;
let data_notification = create_data_notification(
notification_id_generator,
client_response_payload,
Some(target_ledger_info.clone()),
self.clone().into(),
);
Ok(data_notification)
}
fn create_subscription_request(&mut self) -> Result<DataClientRequest, Error> {
let (next_request_version, known_epoch) = self.next_request_version_and_epoch;
let known_version = next_request_version
.checked_sub(1)
.ok_or_else(|| Error::IntegerOverflow("Last version has overflown!".into()))?;
let data_client_request = match &self.request {
StreamRequest::ContinuouslyStreamTransactions(request) => {
DataClientRequest::NewTransactionsWithProof(NewTransactionsWithProofRequest {
known_version,
known_epoch,
include_events: request.include_events,
})
}
StreamRequest::ContinuouslyStreamTransactionOutputs(_) => {
DataClientRequest::NewTransactionOutputsWithProof(
NewTransactionOutputsWithProofRequest {
known_version,
known_epoch,
},
)
}
request => invalid_stream_request!(request),
};
Ok(data_client_request)
}
fn handle_epoch_ending_response(
&mut self,
response_payload: ResponsePayload,
) -> Result<(), Error> {
if let ResponsePayload::EpochEndingLedgerInfos(epoch_ending_ledger_infos) = response_payload
{
match &epoch_ending_ledger_infos[..] {
[target_ledger_info] => {
info!(
(LogSchema::new(LogEntry::ReceivedDataResponse)
.event(LogEvent::Success)
.message(&format!(
"Received an epoch ending ledger info for epoch: {:?}. \
Setting new target version: {:?}",
target_ledger_info.ledger_info().epoch(),
target_ledger_info.ledger_info().version()
)))
);
self.current_target_ledger_info = Some(target_ledger_info.clone());
Ok(())
}
response_payload => {
return Err(Error::AptosDataClientResponseIsInvalid(format!(
"Received an incorrect number of epoch ending ledger infos. Response: {:?}",
response_payload
)));
}
}
} else {
return Err(Error::AptosDataClientResponseIsInvalid(format!(
"Expected an epoch ending ledger response but got: {:?}",
response_payload
)));
}
}
fn update_stream_version_and_epoch(
&mut self,
request_start_version: Version,
request_end_version: Version,
target_ledger_info: &LedgerInfoWithSignatures,
) -> Result<(), Error> {
let (next_stream_version, mut next_stream_epoch) = self.next_stream_version_and_epoch;
verify_client_request_indices(
next_stream_version,
request_start_version,
request_end_version,
);
if request_end_version == target_ledger_info.ledger_info().version()
&& target_ledger_info.ledger_info().ends_epoch()
{
next_stream_epoch = next_stream_epoch
.checked_add(1)
.ok_or_else(|| Error::IntegerOverflow("Next stream epoch has overflown!".into()))?;
}
let next_stream_version = request_end_version
.checked_add(1)
.ok_or_else(|| Error::IntegerOverflow("Next stream version has overflown!".into()))?;
self.next_stream_version_and_epoch = (next_stream_version, next_stream_epoch);
match &self.request {
StreamRequest::ContinuouslyStreamTransactions(request) => {
if let Some(target) = &request.target {
if request_end_version == target.ledger_info().version() {
self.stream_is_complete = true;
}
}
}
StreamRequest::ContinuouslyStreamTransactionOutputs(request) => {
if let Some(target) = &request.target {
if request_end_version == target.ledger_info().version() {
self.stream_is_complete = true;
}
}
}
request => invalid_stream_request!(request),
};
if request_end_version == target_ledger_info.ledger_info().version() {
self.current_target_ledger_info = None;
}
Ok(())
}
fn update_request_version_and_epoch(
&mut self,
request_end_version: Version,
target_ledger_info: &LedgerInfoWithSignatures,
) -> Result<(), Error> {
let (_, mut next_request_epoch) = self.next_request_version_and_epoch;
if request_end_version == target_ledger_info.ledger_info().version()
&& target_ledger_info.ledger_info().ends_epoch()
{
next_request_epoch = next_request_epoch.checked_add(1).ok_or_else(|| {
Error::IntegerOverflow("Next request epoch has overflown!".into())
})?;
}
let next_request_version = request_end_version
.checked_add(1)
.ok_or_else(|| Error::IntegerOverflow("Next request version has overflown!".into()))?;
self.next_request_version_and_epoch = (next_request_version, next_request_epoch);
Ok(())
}
fn update_request_tracking(
&mut self,
client_requests: &[DataClientRequest],
target_ledger_info: &LedgerInfoWithSignatures,
) -> Result<(), Error> {
match &self.request {
StreamRequest::ContinuouslyStreamTransactions(_) => {
for client_request in client_requests {
match client_request {
DataClientRequest::TransactionsWithProof(request) => {
self.update_request_version_and_epoch(
request.end_version,
target_ledger_info,
)?;
}
request => invalid_client_request!(request, self),
}
}
}
StreamRequest::ContinuouslyStreamTransactionOutputs(_) => {
for client_request in client_requests {
match client_request {
DataClientRequest::TransactionOutputsWithProof(request) => {
self.update_request_version_and_epoch(
request.end_version,
target_ledger_info,
)?;
}
request => invalid_client_request!(request, self),
}
}
}
request => invalid_stream_request!(request),
}
Ok(())
}
}
impl DataStreamEngine for ContinuousTransactionStreamEngine {
fn create_data_client_requests(
&mut self,
max_number_of_requests: u64,
global_data_summary: &GlobalDataSummary,
) -> Result<Vec<DataClientRequest>, Error> {
if self.end_of_epoch_requested || self.subscription_requested {
return Ok(vec![]); }
let (next_request_version, next_request_epoch) = self.next_request_version_and_epoch;
if self.current_target_ledger_info.is_none() {
if let Some(target_ledger_info) =
self.select_target_ledger_info(&global_data_summary.advertised_data)?
{
if target_ledger_info.ledger_info().epoch() > next_request_epoch {
info!(
(LogSchema::new(LogEntry::AptosDataClient)
.event(LogEvent::Pending)
.message(&format!(
"Requested an epoch ending ledger info for epoch: {:?}",
next_request_epoch
)))
);
self.end_of_epoch_requested = true;
return Ok(vec![DataClientRequest::EpochEndingLedgerInfos(
EpochEndingLedgerInfosRequest {
start_epoch: next_request_epoch,
end_epoch: next_request_epoch,
},
)]);
} else {
debug!(
(LogSchema::new(LogEntry::ReceivedDataResponse)
.event(LogEvent::Success)
.message(&format!(
"Setting new target ledger info. Version: {:?}, Epoch: {:?}",
target_ledger_info.ledger_info().version(),
target_ledger_info.ledger_info().epoch()
)))
);
self.current_target_ledger_info = Some(target_ledger_info);
}
}
}
let maybe_target_ledger_info = self.current_target_ledger_info.clone();
let client_requests = if let Some(target_ledger_info) = maybe_target_ledger_info {
if next_request_version > target_ledger_info.ledger_info().version() {
return Ok(vec![]);
}
let optimal_chunk_sizes = match &self.request {
StreamRequest::ContinuouslyStreamTransactions(_) => {
global_data_summary
.optimal_chunk_sizes
.transaction_chunk_size
}
StreamRequest::ContinuouslyStreamTransactionOutputs(_) => {
global_data_summary
.optimal_chunk_sizes
.transaction_output_chunk_size
}
request => invalid_stream_request!(request),
};
let client_requests = create_data_client_requests(
next_request_version,
target_ledger_info.ledger_info().version(),
max_number_of_requests,
optimal_chunk_sizes,
self.clone().into(),
)?;
self.update_request_tracking(&client_requests, &target_ledger_info)?;
client_requests
} else {
let subscription_request = self.create_subscription_request()?;
self.subscription_requested = true;
vec![subscription_request]
};
Ok(client_requests)
}
fn is_remaining_data_available(&self, advertised_data: &AdvertisedData) -> bool {
let advertised_ranges = match &self.request {
StreamRequest::ContinuouslyStreamTransactions(_) => &advertised_data.transactions,
StreamRequest::ContinuouslyStreamTransactionOutputs(_) => {
&advertised_data.transaction_outputs
}
request => invalid_stream_request!(request),
};
let (next_request_version, _) = self.next_request_version_and_epoch;
AdvertisedData::contains_range(
next_request_version,
next_request_version,
advertised_ranges,
)
}
fn is_stream_complete(&self) -> bool {
self.stream_is_complete
}
fn transform_client_response_into_notification(
&mut self,
client_request: &DataClientRequest,
client_response_payload: ResponsePayload,
notification_id_generator: Arc<U64IdGenerator>,
) -> Result<Option<DataNotification>, Error> {
if self.end_of_epoch_requested {
self.end_of_epoch_requested = false;
} else if self.subscription_requested {
self.subscription_requested = false;
}
match client_request {
EpochEndingLedgerInfos(_) => {
self.handle_epoch_ending_response(client_response_payload)?;
Ok(None)
}
NewTransactionsWithProof(request) => match &self.request {
StreamRequest::ContinuouslyStreamTransactions(_) => {
let data_notification = self.create_notification_for_subscription_data(
request.known_version,
client_response_payload,
notification_id_generator,
)?;
Ok(Some(data_notification))
}
request => invalid_stream_request!(request),
},
NewTransactionOutputsWithProof(request) => match &self.request {
StreamRequest::ContinuouslyStreamTransactionOutputs(_) => {
let data_notification = self.create_notification_for_subscription_data(
request.known_version,
client_response_payload,
notification_id_generator,
)?;
Ok(Some(data_notification))
}
request => invalid_stream_request!(request),
},
TransactionsWithProof(request) => match &self.request {
StreamRequest::ContinuouslyStreamTransactions(_) => {
let data_notification = self.create_notification_for_continuous_data(
request.start_version,
request.end_version,
client_response_payload,
notification_id_generator,
)?;
Ok(Some(data_notification))
}
request => invalid_stream_request!(request),
},
TransactionOutputsWithProof(request) => match &self.request {
StreamRequest::ContinuouslyStreamTransactionOutputs(_) => {
let data_notification = self.create_notification_for_continuous_data(
request.start_version,
request.end_version,
client_response_payload,
notification_id_generator,
)?;
Ok(Some(data_notification))
}
request => invalid_stream_request!(request),
},
request => invalid_client_request!(request, self),
}
}
}
#[derive(Clone, Debug)]
pub struct EpochEndingStreamEngine {
pub request: GetAllEpochEndingLedgerInfosRequest,
pub end_epoch: Epoch,
pub next_stream_epoch: Epoch,
pub next_request_epoch: Epoch,
pub stream_is_complete: bool,
}
impl EpochEndingStreamEngine {
fn new(
request: &GetAllEpochEndingLedgerInfosRequest,
advertised_data: &AdvertisedData,
) -> Result<Self, Error> {
let end_epoch = advertised_data
.highest_epoch_ending_ledger_info()
.ok_or_else(|| {
Error::DataIsUnavailable(format!(
"Unable to find any epoch ending ledger info in the network: {:?}",
advertised_data
))
})?;
if end_epoch < request.start_epoch {
return Err(Error::DataIsUnavailable(format!(
"The epoch to start syncing from is higher than the highest epoch ending ledger info! Highest: {:?}, start: {:?}",
end_epoch, request.start_epoch
)));
}
info!(
(LogSchema::new(LogEntry::ReceivedDataResponse)
.event(LogEvent::Success)
.message(&format!(
"Setting the highest epoch ending ledger info for the stream at: {:?}",
end_epoch
)))
);
Ok(EpochEndingStreamEngine {
request: request.clone(),
end_epoch,
next_stream_epoch: request.start_epoch,
next_request_epoch: request.start_epoch,
stream_is_complete: false,
})
}
fn update_request_tracking(
&mut self,
client_requests: &[DataClientRequest],
) -> Result<(), Error> {
for client_request in client_requests {
match client_request {
EpochEndingLedgerInfos(request) => {
self.next_request_epoch =
request.end_epoch.checked_add(1).ok_or_else(|| {
Error::IntegerOverflow("Next request epoch has overflown!".into())
})?;
}
request => invalid_client_request!(request, self),
}
}
Ok(())
}
}
impl DataStreamEngine for EpochEndingStreamEngine {
fn create_data_client_requests(
&mut self,
max_number_of_requests: u64,
global_data_summary: &GlobalDataSummary,
) -> Result<Vec<DataClientRequest>, Error> {
let client_requests = create_data_client_requests(
self.next_request_epoch,
self.end_epoch,
max_number_of_requests,
global_data_summary.optimal_chunk_sizes.epoch_chunk_size,
self.clone().into(),
)?;
self.update_request_tracking(&client_requests)?;
Ok(client_requests)
}
fn is_remaining_data_available(&self, advertised_data: &AdvertisedData) -> bool {
let start_epoch = self.next_stream_epoch;
let end_epoch = self.end_epoch;
AdvertisedData::contains_range(
start_epoch,
end_epoch,
&advertised_data.epoch_ending_ledger_infos,
)
}
fn is_stream_complete(&self) -> bool {
self.stream_is_complete
}
fn transform_client_response_into_notification(
&mut self,
client_request: &DataClientRequest,
client_response_payload: ResponsePayload,
notification_id_generator: Arc<U64IdGenerator>,
) -> Result<Option<DataNotification>, Error> {
match client_request {
EpochEndingLedgerInfos(request) => {
verify_client_request_indices(
self.next_stream_epoch,
request.start_epoch,
request.end_epoch,
);
self.next_stream_epoch = request.end_epoch.checked_add(1).ok_or_else(|| {
Error::IntegerOverflow("Next stream epoch has overflown!".into())
})?;
if request.end_epoch == self.end_epoch {
self.stream_is_complete = true;
}
let data_notification = create_data_notification(
notification_id_generator,
client_response_payload,
None,
self.clone().into(),
);
Ok(Some(data_notification))
}
request => invalid_client_request!(request, self),
}
}
}
#[derive(Clone, Debug)]
pub struct TransactionStreamEngine {
pub request: StreamRequest,
pub next_stream_version: Version,
pub next_request_version: Version,
pub stream_is_complete: bool,
}
impl TransactionStreamEngine {
fn new(stream_request: &StreamRequest) -> Result<Self, Error> {
match stream_request {
StreamRequest::GetAllTransactions(request) => Ok(TransactionStreamEngine {
request: stream_request.clone(),
next_stream_version: request.start_version,
next_request_version: request.start_version,
stream_is_complete: false,
}),
StreamRequest::GetAllTransactionOutputs(request) => Ok(TransactionStreamEngine {
request: stream_request.clone(),
next_stream_version: request.start_version,
next_request_version: request.start_version,
stream_is_complete: false,
}),
request => invalid_stream_request!(request),
}
}
fn update_stream_version(
&mut self,
request_start_version: Version,
request_end_version: Version,
stream_end_version: Version,
) -> Result<(), Error> {
verify_client_request_indices(
self.next_stream_version,
request_start_version,
request_end_version,
);
self.next_stream_version = request_end_version
.checked_add(1)
.ok_or_else(|| Error::IntegerOverflow("Next stream version has overflown!".into()))?;
if request_end_version == stream_end_version {
self.stream_is_complete = true;
}
Ok(())
}
fn update_request_version(&mut self, request_end_version: Version) -> Result<(), Error> {
self.next_request_version = request_end_version
.checked_add(1)
.ok_or_else(|| Error::IntegerOverflow("Next request version has overflown!".into()))?;
Ok(())
}
fn update_request_tracking(
&mut self,
client_requests: &[DataClientRequest],
) -> Result<(), Error> {
match &self.request {
StreamRequest::GetAllTransactions(_) => {
for client_request in client_requests.iter() {
match client_request {
TransactionsWithProof(request) => {
self.update_request_version(request.end_version)?;
}
request => invalid_client_request!(request, self),
}
}
}
StreamRequest::GetAllTransactionOutputs(_) => {
for client_request in client_requests.iter() {
match client_request {
TransactionOutputsWithProof(request) => {
self.update_request_version(request.end_version)?;
}
request => invalid_client_request!(request, self),
}
}
}
request => invalid_stream_request!(request),
}
Ok(())
}
}
impl DataStreamEngine for TransactionStreamEngine {
fn create_data_client_requests(
&mut self,
max_number_of_requests: u64,
global_data_summary: &GlobalDataSummary,
) -> Result<Vec<DataClientRequest>, Error> {
let (request_end_version, optimal_chunk_sizes) = match &self.request {
StreamRequest::GetAllTransactions(request) => (
request.end_version,
global_data_summary
.optimal_chunk_sizes
.transaction_chunk_size,
),
StreamRequest::GetAllTransactionOutputs(request) => (
request.end_version,
global_data_summary
.optimal_chunk_sizes
.transaction_output_chunk_size,
),
request => invalid_stream_request!(request),
};
let client_requests = create_data_client_requests(
self.next_request_version,
request_end_version,
max_number_of_requests,
optimal_chunk_sizes,
self.clone().into(),
)?;
self.update_request_tracking(&client_requests)?;
Ok(client_requests)
}
fn is_remaining_data_available(&self, advertised_data: &AdvertisedData) -> bool {
let (request_end_version, advertised_ranges) = match &self.request {
StreamRequest::GetAllTransactions(request) => {
(request.end_version, &advertised_data.transactions)
}
StreamRequest::GetAllTransactionOutputs(request) => {
(request.end_version, &advertised_data.transaction_outputs)
}
request => invalid_stream_request!(request),
};
AdvertisedData::contains_range(
self.next_stream_version,
request_end_version,
advertised_ranges,
)
}
fn is_stream_complete(&self) -> bool {
self.stream_is_complete
}
fn transform_client_response_into_notification(
&mut self,
client_request: &DataClientRequest,
client_response_payload: ResponsePayload,
notification_id_generator: Arc<U64IdGenerator>,
) -> Result<Option<DataNotification>, Error> {
match &self.request {
StreamRequest::GetAllTransactions(stream_request) => match client_request {
TransactionsWithProof(request) => {
let stream_end_version = stream_request.end_version;
self.update_stream_version(
request.start_version,
request.end_version,
stream_end_version,
)?;
}
request => invalid_client_request!(request, self),
},
StreamRequest::GetAllTransactionOutputs(stream_request) => match client_request {
TransactionOutputsWithProof(request) => {
let stream_end_version = stream_request.end_version;
self.update_stream_version(
request.start_version,
request.end_version,
stream_end_version,
)?;
}
request => invalid_client_request!(request, self),
},
request => invalid_stream_request!(request),
}
let data_notification = create_data_notification(
notification_id_generator,
client_response_payload,
None,
self.clone().into(),
);
Ok(Some(data_notification))
}
}
fn verify_client_request_indices(expected_next_index: u64, start_index: u64, end_index: u64) {
if start_index != expected_next_index {
panic!(
"The start index did not match the expected next index! Given: {:?}, expected: {:?}",
start_index, expected_next_index
);
}
if end_index < expected_next_index {
panic!(
"The end index was less than the expected next index! Given: {:?}, expected: {:?}",
end_index, expected_next_index
);
}
}
fn create_data_client_requests(
start_index: u64,
end_index: u64,
max_number_of_requests: u64,
optimal_chunk_size: u64,
stream_engine: StreamEngine,
) -> Result<Vec<DataClientRequest>, Error> {
if start_index > end_index {
return Ok(vec![]);
}
let mut total_items_to_fetch = end_index
.checked_sub(start_index)
.and_then(|e| e.checked_add(1)) .ok_or_else(|| Error::IntegerOverflow("Total items to fetch has overflown!".into()))?;
let mut data_client_requests = vec![];
let mut num_requests_made = 0;
let mut next_index_to_request = start_index;
while total_items_to_fetch > 0 && num_requests_made < max_number_of_requests {
let num_items_to_fetch = cmp::min(total_items_to_fetch, optimal_chunk_size);
let request_start_index = next_index_to_request;
let request_end_index = request_start_index
.checked_add(num_items_to_fetch)
.and_then(|e| e.checked_sub(1)) .ok_or_else(|| Error::IntegerOverflow("End index to fetch has overflown!".into()))?;
let data_client_request =
create_data_client_request(request_start_index, request_end_index, &stream_engine);
data_client_requests.push(data_client_request);
next_index_to_request = request_end_index
.checked_add(1)
.ok_or_else(|| Error::IntegerOverflow("Next index to request has overflown!".into()))?;
total_items_to_fetch = total_items_to_fetch
.checked_sub(num_items_to_fetch)
.ok_or_else(|| Error::IntegerOverflow("Total items to fetch has overflown!".into()))?;
num_requests_made = num_requests_made.checked_add(1).ok_or_else(|| {
Error::IntegerOverflow("Number of payload requests has overflown!".into())
})?;
}
Ok(data_client_requests)
}
fn create_data_client_request(
start_index: u64,
end_index: u64,
stream_engine: &StreamEngine,
) -> DataClientRequest {
match stream_engine {
StreamEngine::StateStreamEngine(stream_engine) => {
DataClientRequest::StateValuesWithProof(StateValuesWithProofRequest {
version: stream_engine.request.version,
start_index,
end_index,
})
}
StreamEngine::ContinuousTransactionStreamEngine(stream_engine) => {
let target_ledger_info_version = stream_engine
.get_target_ledger_info()
.ledger_info()
.version();
match &stream_engine.request {
StreamRequest::ContinuouslyStreamTransactions(request) => {
DataClientRequest::TransactionsWithProof(TransactionsWithProofRequest {
start_version: start_index,
end_version: end_index,
proof_version: target_ledger_info_version,
include_events: request.include_events,
})
}
StreamRequest::ContinuouslyStreamTransactionOutputs(_) => {
DataClientRequest::TransactionOutputsWithProof(
TransactionOutputsWithProofRequest {
start_version: start_index,
end_version: end_index,
proof_version: target_ledger_info_version,
},
)
}
request => invalid_stream_request!(request),
}
}
StreamEngine::EpochEndingStreamEngine(_) => {
DataClientRequest::EpochEndingLedgerInfos(EpochEndingLedgerInfosRequest {
start_epoch: start_index,
end_epoch: end_index,
})
}
StreamEngine::TransactionStreamEngine(stream_engine) => match &stream_engine.request {
StreamRequest::GetAllTransactions(request) => {
DataClientRequest::TransactionsWithProof(TransactionsWithProofRequest {
start_version: start_index,
end_version: end_index,
proof_version: request.proof_version,
include_events: request.include_events,
})
}
StreamRequest::GetAllTransactionOutputs(request) => {
DataClientRequest::TransactionOutputsWithProof(TransactionOutputsWithProofRequest {
start_version: start_index,
end_version: end_index,
proof_version: request.proof_version,
})
}
request => invalid_stream_request!(request),
},
}
}
fn create_data_notification(
notification_id_generator: Arc<U64IdGenerator>,
client_response: ResponsePayload,
target_ledger_info: Option<LedgerInfoWithSignatures>,
stream_engine: StreamEngine,
) -> DataNotification {
let notification_id = notification_id_generator.next();
let client_response_type = client_response.get_label();
let data_payload = match client_response {
ResponsePayload::StateValuesWithProof(states_chunk) => {
DataPayload::StateValuesWithProof(states_chunk)
}
ResponsePayload::EpochEndingLedgerInfos(ledger_infos) => {
DataPayload::EpochEndingLedgerInfos(ledger_infos)
}
ResponsePayload::NewTransactionsWithProof((transactions_chunk, target_ledger_info)) => {
match stream_engine {
StreamEngine::ContinuousTransactionStreamEngine(_) => {
DataPayload::ContinuousTransactionsWithProof(
target_ledger_info,
transactions_chunk,
)
}
_ => invalid_response_type!(client_response_type),
}
}
ResponsePayload::NewTransactionOutputsWithProof((
transactions_output_chunk,
target_ledger_info,
)) => match stream_engine {
StreamEngine::ContinuousTransactionStreamEngine(_) => {
DataPayload::ContinuousTransactionOutputsWithProof(
target_ledger_info,
transactions_output_chunk,
)
}
_ => invalid_response_type!(client_response_type),
},
ResponsePayload::TransactionsWithProof(transactions_chunk) => match stream_engine {
StreamEngine::ContinuousTransactionStreamEngine(_) => {
DataPayload::ContinuousTransactionsWithProof(
target_ledger_info.expect("A target ledger info is required!"),
transactions_chunk,
)
}
StreamEngine::TransactionStreamEngine(_) => {
DataPayload::TransactionsWithProof(transactions_chunk)
}
_ => invalid_response_type!(client_response_type),
},
ResponsePayload::TransactionOutputsWithProof(transactions_output_chunk) => {
match stream_engine {
StreamEngine::ContinuousTransactionStreamEngine(_) => {
DataPayload::ContinuousTransactionOutputsWithProof(
target_ledger_info.expect("A target ledger info is required!"),
transactions_output_chunk,
)
}
StreamEngine::TransactionStreamEngine(_) => {
DataPayload::TransactionOutputsWithProof(transactions_output_chunk)
}
_ => invalid_response_type!(client_response_type),
}
}
_ => invalid_response_type!(client_response_type),
};
DataNotification {
notification_id,
data_payload,
}
}