use crate::{data_notification::NotificationId, data_stream::DataStreamListener, error::Error};
use aptos_types::{ledger_info::LedgerInfoWithSignatures, transaction::Version};
use async_trait::async_trait;
use futures::{
channel::{mpsc, oneshot},
stream::FusedStream,
SinkExt, Stream,
};
use std::{
pin::Pin,
task::{Context, Poll},
};
pub type Epoch = u64;
#[async_trait]
pub trait DataStreamingClient {
async fn get_all_state_values(
&self,
version: Version,
start_index: Option<u64>,
) -> Result<DataStreamListener, Error>;
async fn get_all_epoch_ending_ledger_infos(
&self,
start_epoch: Epoch,
) -> Result<DataStreamListener, Error>;
async fn get_all_transaction_outputs(
&self,
start_version: Version,
end_version: Version,
proof_version: Version,
) -> Result<DataStreamListener, Error>;
async fn get_all_transactions(
&self,
start_version: Version,
end_version: Version,
proof_version: Version,
include_events: bool,
) -> Result<DataStreamListener, Error>;
async fn continuously_stream_transaction_outputs(
&self,
known_version: u64,
known_epoch: u64,
target: Option<LedgerInfoWithSignatures>,
) -> Result<DataStreamListener, Error>;
async fn continuously_stream_transactions(
&self,
start_version: Version,
start_epoch: Epoch,
include_events: bool,
target: Option<LedgerInfoWithSignatures>,
) -> Result<DataStreamListener, Error>;
async fn terminate_stream_with_feedback(
&self,
notification_id: NotificationId,
notification_feedback: NotificationFeedback,
) -> Result<(), Error>;
}
#[derive(Debug)]
pub struct StreamRequestMessage {
pub stream_request: StreamRequest,
pub response_sender: oneshot::Sender<Result<DataStreamListener, Error>>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum StreamRequest {
GetAllEpochEndingLedgerInfos(GetAllEpochEndingLedgerInfosRequest),
GetAllStates(GetAllStatesRequest),
GetAllTransactions(GetAllTransactionsRequest),
GetAllTransactionOutputs(GetAllTransactionOutputsRequest),
ContinuouslyStreamTransactions(ContinuouslyStreamTransactionsRequest),
ContinuouslyStreamTransactionOutputs(ContinuouslyStreamTransactionOutputsRequest),
TerminateStream(TerminateStreamRequest),
}
impl StreamRequest {
pub fn get_label(&self) -> &'static str {
match self {
Self::GetAllEpochEndingLedgerInfos(_) => "get_all_epoch_ending_ledger_infos",
Self::GetAllStates(_) => "get_all_states",
Self::GetAllTransactions(_) => "get_all_transactions",
Self::GetAllTransactionOutputs(_) => "get_all_transaction_outputs",
Self::ContinuouslyStreamTransactions(_) => "continuously_stream_transactions",
Self::ContinuouslyStreamTransactionOutputs(_) => {
"continuously_stream_transaction_outputs"
}
Self::TerminateStream(_) => "terminate_stream",
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GetAllEpochEndingLedgerInfosRequest {
pub start_epoch: Epoch,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GetAllStatesRequest {
pub version: Version,
pub start_index: u64,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GetAllTransactionsRequest {
pub start_version: Version,
pub end_version: Version,
pub proof_version: Version,
pub include_events: bool,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GetAllTransactionOutputsRequest {
pub start_version: Version,
pub end_version: Version,
pub proof_version: Version,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ContinuouslyStreamTransactionsRequest {
pub known_version: Version,
pub known_epoch: Epoch,
pub include_events: bool,
pub target: Option<LedgerInfoWithSignatures>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ContinuouslyStreamTransactionOutputsRequest {
pub known_version: Version,
pub known_epoch: Epoch,
pub target: Option<LedgerInfoWithSignatures>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TerminateStreamRequest {
pub notification_id: NotificationId,
pub notification_feedback: NotificationFeedback,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum NotificationFeedback {
EmptyPayloadData,
EndOfStream,
InvalidPayloadData,
PayloadProofFailed,
PayloadTypeIsIncorrect,
}
impl NotificationFeedback {
pub fn get_label(&self) -> &'static str {
match self {
Self::EmptyPayloadData => "empty_payload_data",
Self::EndOfStream => "end_of_stream",
Self::InvalidPayloadData => "invalid_payload_data",
Self::PayloadProofFailed => "payload_proof_failed",
Self::PayloadTypeIsIncorrect => "payload_type_is_correct",
}
}
}
#[derive(Clone)]
pub struct StreamingServiceClient {
request_sender: mpsc::UnboundedSender<StreamRequestMessage>,
}
impl StreamingServiceClient {
pub fn new(request_sender: mpsc::UnboundedSender<StreamRequestMessage>) -> Self {
Self { request_sender }
}
async fn send_stream_request(
&self,
client_request: StreamRequest,
) -> Result<oneshot::Receiver<Result<DataStreamListener, Error>>, Error> {
let mut request_sender = self.request_sender.clone();
let (response_sender, response_receiver) = oneshot::channel();
let request_message = StreamRequestMessage {
stream_request: client_request,
response_sender,
};
request_sender.send(request_message).await?;
Ok(response_receiver)
}
async fn send_request_and_await_response(
&self,
client_request: StreamRequest,
) -> Result<DataStreamListener, Error> {
let response_receiver = self.send_stream_request(client_request).await?;
response_receiver.await?
}
}
#[async_trait]
impl DataStreamingClient for StreamingServiceClient {
async fn get_all_state_values(
&self,
version: u64,
start_index: Option<u64>,
) -> Result<DataStreamListener, Error> {
let start_index = start_index.unwrap_or(0);
let client_request = StreamRequest::GetAllStates(GetAllStatesRequest {
version,
start_index,
});
self.send_request_and_await_response(client_request).await
}
async fn get_all_epoch_ending_ledger_infos(
&self,
start_epoch: u64,
) -> Result<DataStreamListener, Error> {
let client_request =
StreamRequest::GetAllEpochEndingLedgerInfos(GetAllEpochEndingLedgerInfosRequest {
start_epoch,
});
self.send_request_and_await_response(client_request).await
}
async fn get_all_transaction_outputs(
&self,
start_version: u64,
end_version: u64,
proof_version: u64,
) -> Result<DataStreamListener, Error> {
let client_request =
StreamRequest::GetAllTransactionOutputs(GetAllTransactionOutputsRequest {
start_version,
end_version,
proof_version,
});
self.send_request_and_await_response(client_request).await
}
async fn get_all_transactions(
&self,
start_version: u64,
end_version: u64,
proof_version: u64,
include_events: bool,
) -> Result<DataStreamListener, Error> {
let client_request = StreamRequest::GetAllTransactions(GetAllTransactionsRequest {
start_version,
end_version,
proof_version,
include_events,
});
self.send_request_and_await_response(client_request).await
}
async fn continuously_stream_transaction_outputs(
&self,
known_version: u64,
known_epoch: u64,
target: Option<LedgerInfoWithSignatures>,
) -> Result<DataStreamListener, Error> {
let client_request = StreamRequest::ContinuouslyStreamTransactionOutputs(
ContinuouslyStreamTransactionOutputsRequest {
known_version,
known_epoch,
target,
},
);
self.send_request_and_await_response(client_request).await
}
async fn continuously_stream_transactions(
&self,
known_version: u64,
known_epoch: u64,
include_events: bool,
target: Option<LedgerInfoWithSignatures>,
) -> Result<DataStreamListener, Error> {
let client_request =
StreamRequest::ContinuouslyStreamTransactions(ContinuouslyStreamTransactionsRequest {
known_version,
known_epoch,
include_events,
target,
});
self.send_request_and_await_response(client_request).await
}
async fn terminate_stream_with_feedback(
&self,
notification_id: u64,
notification_feedback: NotificationFeedback,
) -> Result<(), Error> {
let client_request = StreamRequest::TerminateStream(TerminateStreamRequest {
notification_id,
notification_feedback,
});
let _ = self.send_stream_request(client_request).await?;
Ok(())
}
}
#[derive(Debug)]
pub struct StreamingServiceListener {
request_receiver: mpsc::UnboundedReceiver<StreamRequestMessage>,
}
impl StreamingServiceListener {
pub fn new(request_receiver: mpsc::UnboundedReceiver<StreamRequestMessage>) -> Self {
Self { request_receiver }
}
}
impl Stream for StreamingServiceListener {
type Item = StreamRequestMessage;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.get_mut().request_receiver).poll_next(cx)
}
}
impl FusedStream for StreamingServiceListener {
fn is_terminated(&self) -> bool {
self.request_receiver.is_terminated()
}
}
pub fn new_streaming_service_client_listener_pair(
) -> (StreamingServiceClient, StreamingServiceListener) {
let (request_sender, request_listener) = mpsc::unbounded();
let streaming_service_client = StreamingServiceClient::new(request_sender);
let streaming_service_listener = StreamingServiceListener::new(request_listener);
(streaming_service_client, streaming_service_listener)
}