use crate::{chunk_request::GetChunkRequest, chunk_response::GetChunkResponse, counters};
use aptos_types::PeerId;
use async_trait::async_trait;
use channel::{aptos_channel, message_queues::QueueStyle};
use network::{
error::NetworkError,
peer_manager::{ConnectionRequestSender, PeerManagerRequestSender},
protocols::network::{
AppConfig, ApplicationNetworkSender, NetworkEvents, NetworkSender, NewNetworkSender,
RpcError,
},
ProtocolId,
};
use serde::{Deserialize, Serialize};
use std::time::Duration;
const STATE_SYNC_MAX_BUFFER_SIZE: usize = 1;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum StateSyncMessage {
GetChunkRequest(Box<GetChunkRequest>),
GetChunkResponse(Box<GetChunkResponse>),
}
pub type StateSyncEvents = NetworkEvents<StateSyncMessage>;
#[derive(Clone)]
pub struct StateSyncSender {
inner: NetworkSender<StateSyncMessage>,
}
impl NewNetworkSender for StateSyncSender {
fn new(
peer_mgr_reqs_tx: PeerManagerRequestSender,
connection_reqs_tx: ConnectionRequestSender,
) -> Self {
Self {
inner: NetworkSender::new(peer_mgr_reqs_tx, connection_reqs_tx),
}
}
}
#[async_trait]
impl ApplicationNetworkSender<StateSyncMessage> for StateSyncSender {
fn send_to(&self, recipient: PeerId, message: StateSyncMessage) -> Result<(), NetworkError> {
let protocol = ProtocolId::StateSyncDirectSend;
self.inner.send_to(recipient, protocol, message)
}
async fn send_rpc(
&self,
_recipient: PeerId,
_req_msg: StateSyncMessage,
_timeout: Duration,
) -> Result<StateSyncMessage, RpcError> {
unimplemented!()
}
}
pub fn network_endpoint_config() -> AppConfig {
AppConfig::p2p(
[ProtocolId::StateSyncDirectSend],
aptos_channel::Config::new(STATE_SYNC_MAX_BUFFER_SIZE)
.queue_style(QueueStyle::LIFO)
.counters(&counters::PENDING_STATE_SYNC_NETWORK_EVENTS),
)
}