1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use crate::metrics;
use aptos_config::config::StorageServiceConfig;
use aptos_types::PeerId;
use bytes::Bytes;
use channel::{aptos_channel, message_queues::QueueStyle};
use futures::{
channel::oneshot,
future,
stream::{BoxStream, Stream, StreamExt},
};
use network::{
peer_manager::{ConnectionNotification, PeerManagerNotification},
protocols::network::{AppConfig, Event, NetworkEvents, NewNetworkEvents, RpcError},
ProtocolId,
};
use std::{
pin::Pin,
task::{Context, Poll},
};
use storage_service_types::{
Result, StorageServiceMessage, StorageServiceRequest, StorageServiceResponse,
};
pub fn network_endpoint_config(storage_config: StorageServiceConfig) -> AppConfig {
let max_network_channel_size = storage_config.max_network_channel_size as usize;
AppConfig::service(
[ProtocolId::StorageServiceRpc],
aptos_channel::Config::new(max_network_channel_size)
.queue_style(QueueStyle::FIFO)
.counters(&metrics::PENDING_STORAGE_SERVER_NETWORK_EVENTS),
)
}
pub type NetworkRequest = (PeerId, ProtocolId, StorageServiceRequest, ResponseSender);
pub struct StorageServiceNetworkEvents(BoxStream<'static, NetworkRequest>);
impl NewNetworkEvents for StorageServiceNetworkEvents {
fn new(
peer_mgr_notifs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerNotification>,
connection_notifs_rx: aptos_channel::Receiver<PeerId, ConnectionNotification>,
) -> Self {
let events = NetworkEvents::new(peer_mgr_notifs_rx, connection_notifs_rx)
.filter_map(|event| future::ready(Self::event_to_request(event)))
.boxed();
Self(events)
}
}
impl StorageServiceNetworkEvents {
fn event_to_request(event: Event<StorageServiceMessage>) -> Option<NetworkRequest> {
match event {
Event::RpcRequest(
peer_id,
StorageServiceMessage::Request(request),
protocol_id,
response_tx,
) => {
let response_tx = ResponseSender::new(response_tx);
Some((peer_id, protocol_id, request, response_tx))
}
_ => None,
}
}
}
impl Stream for StorageServiceNetworkEvents {
type Item = NetworkRequest;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll_next(cx)
}
}
pub struct ResponseSender {
response_tx: oneshot::Sender<Result<Bytes, RpcError>>,
}
impl ResponseSender {
pub fn new(response_tx: oneshot::Sender<Result<Bytes, RpcError>>) -> Self {
Self { response_tx }
}
pub fn send(self, response: Result<StorageServiceResponse>) {
let msg = StorageServiceMessage::Response(response);
let result = bcs::to_bytes(&msg)
.map(Bytes::from)
.map_err(RpcError::BcsError);
let _ = self.response_tx.send(result);
}
}