1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use crate::metrics;
use aptos_config::config::StorageServiceConfig;
use aptos_types::PeerId;
use bytes::Bytes;
use channel::{aptos_channel, message_queues::QueueStyle};
use futures::{
    channel::oneshot,
    future,
    stream::{BoxStream, Stream, StreamExt},
};
use network::{
    peer_manager::{ConnectionNotification, PeerManagerNotification},
    protocols::network::{AppConfig, Event, NetworkEvents, NewNetworkEvents, RpcError},
    ProtocolId,
};
use std::{
    pin::Pin,
    task::{Context, Poll},
};
use storage_service_types::{
    Result, StorageServiceMessage, StorageServiceRequest, StorageServiceResponse,
};

pub fn network_endpoint_config(storage_config: StorageServiceConfig) -> AppConfig {
    let max_network_channel_size = storage_config.max_network_channel_size as usize;
    AppConfig::service(
        [ProtocolId::StorageServiceRpc],
        aptos_channel::Config::new(max_network_channel_size)
            .queue_style(QueueStyle::FIFO)
            .counters(&metrics::PENDING_STORAGE_SERVER_NETWORK_EVENTS),
    )
}

pub type NetworkRequest = (PeerId, ProtocolId, StorageServiceRequest, ResponseSender);

/// A stream of requests from network. Each request also comes with a callback to
/// send the response.
pub struct StorageServiceNetworkEvents(BoxStream<'static, NetworkRequest>);

impl NewNetworkEvents for StorageServiceNetworkEvents {
    fn new(
        peer_mgr_notifs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerNotification>,
        connection_notifs_rx: aptos_channel::Receiver<PeerId, ConnectionNotification>,
    ) -> Self {
        let events = NetworkEvents::new(peer_mgr_notifs_rx, connection_notifs_rx)
            .filter_map(|event| future::ready(Self::event_to_request(event)))
            .boxed();

        Self(events)
    }
}

impl StorageServiceNetworkEvents {
    /// Filters out everything except Rpc requests
    fn event_to_request(event: Event<StorageServiceMessage>) -> Option<NetworkRequest> {
        // TODO(philiphayes): logging
        match event {
            Event::RpcRequest(
                peer_id,
                StorageServiceMessage::Request(request),
                protocol_id,
                response_tx,
            ) => {
                let response_tx = ResponseSender::new(response_tx);
                Some((peer_id, protocol_id, request, response_tx))
            }
            // We don't use DirectSend and don't care about connection events.
            _ => None,
        }
    }
}

impl Stream for StorageServiceNetworkEvents {
    type Item = NetworkRequest;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Pin::new(&mut self.0).poll_next(cx)
    }
}

/// A channel for fulfilling a pending StorageService RPC request.
/// Provides a more strongly typed interface around the raw RPC response channel.
pub struct ResponseSender {
    response_tx: oneshot::Sender<Result<Bytes, RpcError>>,
}

impl ResponseSender {
    pub fn new(response_tx: oneshot::Sender<Result<Bytes, RpcError>>) -> Self {
        Self { response_tx }
    }

    pub fn send(self, response: Result<StorageServiceResponse>) {
        let msg = StorageServiceMessage::Response(response);
        let result = bcs::to_bytes(&msg)
            .map(Bytes::from)
            .map_err(RpcError::BcsError);
        let _ = self.response_tx.send(result);
    }
}