kaspa_p2p_lib/core/
connection_handler.rs

1use crate::common::ProtocolError;
2use crate::core::hub::HubEvent;
3use crate::pb::{
4    p2p_client::P2pClient as ProtoP2pClient, p2p_server::P2p as ProtoP2p, p2p_server::P2pServer as ProtoP2pServer, KaspadMessage,
5};
6use crate::{ConnectionInitializer, Router};
7use futures::FutureExt;
8use kaspa_core::{debug, info};
9use kaspa_utils::networking::NetAddress;
10use kaspa_utils_tower::{
11    counters::TowerConnectionCounters,
12    middleware::{measure_request_body_size_layer, CountBytesBody, MapResponseBodyLayer, ServiceBuilder},
13};
14use std::net::ToSocketAddrs;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::Duration;
18use thiserror::Error;
19use tokio::sync::mpsc::{channel as mpsc_channel, Sender as MpscSender};
20use tokio::sync::oneshot::{channel as oneshot_channel, Sender as OneshotSender};
21use tokio_stream::wrappers::ReceiverStream;
22use tokio_stream::StreamExt;
23use tonic::codegen::Body;
24use tonic::transport::{Error as TonicError, Server as TonicServer};
25use tonic::{Request, Response, Status as TonicStatus, Streaming};
26
27#[derive(Error, Debug)]
28pub enum ConnectionError {
29    #[error("missing socket address")]
30    NoAddress,
31
32    #[error("{0}")]
33    IoError(#[from] std::io::Error),
34
35    #[error("{0}")]
36    TonicError(#[from] TonicError),
37
38    #[error("{0}")]
39    TonicStatus(#[from] TonicStatus),
40
41    #[error("{0}")]
42    ProtocolError(#[from] ProtocolError),
43}
44
45/// Maximum P2P decoded gRPC message size to send and receive
46const P2P_MAX_MESSAGE_SIZE: usize = 1024 * 1024 * 1024; // 1GB
47
48/// Handles Router creation for both server and client-side new connections
49#[derive(Clone)]
50pub struct ConnectionHandler {
51    /// Cloned on each new connection so that routers can communicate with a central hub
52    hub_sender: MpscSender<HubEvent>,
53    initializer: Arc<dyn ConnectionInitializer>,
54    counters: Arc<TowerConnectionCounters>,
55}
56
57impl ConnectionHandler {
58    pub(crate) fn new(
59        hub_sender: MpscSender<HubEvent>,
60        initializer: Arc<dyn ConnectionInitializer>,
61        counters: Arc<TowerConnectionCounters>,
62    ) -> Self {
63        Self { hub_sender, initializer, counters }
64    }
65
66    /// Launches a P2P server listener loop
67    pub(crate) fn serve(&self, serve_address: NetAddress) -> Result<OneshotSender<()>, ConnectionError> {
68        let (termination_sender, termination_receiver) = oneshot_channel::<()>();
69        let connection_handler = self.clone();
70        info!("P2P Server starting on: {}", serve_address);
71
72        let bytes_tx = self.counters.bytes_tx.clone();
73        let bytes_rx = self.counters.bytes_rx.clone();
74
75        tokio::spawn(async move {
76            let proto_server = ProtoP2pServer::new(connection_handler)
77                .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
78                .send_compressed(tonic::codec::CompressionEncoding::Gzip)
79                .max_decoding_message_size(P2P_MAX_MESSAGE_SIZE);
80
81            // TODO: check whether we should set tcp_keepalive
82            let serve_result = TonicServer::builder()
83                .layer(measure_request_body_size_layer(bytes_rx, |b| b))
84                .layer(MapResponseBodyLayer::new(move |body| CountBytesBody::new(body, bytes_tx.clone())))
85                .add_service(proto_server)
86                .serve_with_shutdown(serve_address.into(), termination_receiver.map(drop))
87                .await;
88
89            match serve_result {
90                Ok(_) => info!("P2P Server stopped: {}", serve_address),
91                Err(err) => panic!("P2P, Server {serve_address} stopped with error: {err:?}"),
92            }
93        });
94        Ok(termination_sender)
95    }
96
97    /// Connect to a new peer
98    pub(crate) async fn connect(&self, peer_address: String) -> Result<Arc<Router>, ConnectionError> {
99        let Some(socket_address) = peer_address.to_socket_addrs()?.next() else {
100            return Err(ConnectionError::NoAddress);
101        };
102        let peer_address = format!("http://{}", peer_address); // Add scheme prefix as required by Tonic
103
104        let channel = tonic::transport::Endpoint::new(peer_address)?
105            .timeout(Duration::from_millis(Self::communication_timeout()))
106            .connect_timeout(Duration::from_millis(Self::connect_timeout()))
107            .tcp_keepalive(Some(Duration::from_millis(Self::keep_alive())))
108            .connect()
109            .await?;
110
111        let channel = ServiceBuilder::new()
112            .layer(MapResponseBodyLayer::new(move |body| CountBytesBody::new(body, self.counters.bytes_rx.clone())))
113            .layer(measure_request_body_size_layer(self.counters.bytes_tx.clone(), |body| {
114                body.map_err(|e| tonic::Status::from_error(Box::new(e))).boxed_unsync()
115            }))
116            .service(channel);
117
118        let mut client = ProtoP2pClient::new(channel)
119            .send_compressed(tonic::codec::CompressionEncoding::Gzip)
120            .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
121            .max_decoding_message_size(P2P_MAX_MESSAGE_SIZE);
122
123        let (outgoing_route, outgoing_receiver) = mpsc_channel(Self::outgoing_network_channel_size());
124        let incoming_stream = client.message_stream(ReceiverStream::new(outgoing_receiver)).await?.into_inner();
125
126        let router = Router::new(socket_address, true, self.hub_sender.clone(), incoming_stream, outgoing_route).await;
127
128        // For outbound peers, we perform the initialization as part of the connect logic
129        match self.initializer.initialize_connection(router.clone()).await {
130            Ok(()) => {
131                // Notify the central Hub about the new peer
132                self.hub_sender.send(HubEvent::NewPeer(router.clone())).await.expect("hub receiver should never drop before senders");
133            }
134
135            Err(err) => {
136                router.try_sending_reject_message(&err).await;
137                // Ignoring the new router
138                router.close().await;
139                debug!("P2P, handshake failed for outbound peer {}: {}", router, err);
140                return Err(ConnectionError::ProtocolError(err));
141            }
142        }
143
144        Ok(router)
145    }
146
147    /// Connect to a new peer with `retry_attempts` retries and `retry_interval` duration between each attempt
148    pub(crate) async fn connect_with_retry(
149        &self,
150        address: String,
151        retry_attempts: u8,
152        retry_interval: Duration,
153    ) -> Result<Arc<Router>, ConnectionError> {
154        let mut counter = 0;
155        loop {
156            counter += 1;
157            match self.connect(address.clone()).await {
158                Ok(router) => {
159                    debug!("P2P, Client connected, peer: {:?}", address);
160                    return Ok(router);
161                }
162                Err(ConnectionError::ProtocolError(err)) => {
163                    // On protocol errors we avoid retrying
164                    debug!("P2P, connect retry #{} failed with error {:?}, peer: {:?}, aborting retries", counter, err, address);
165                    return Err(ConnectionError::ProtocolError(err));
166                }
167                Err(err) => {
168                    debug!("P2P, connect retry #{} failed with error {:?}, peer: {:?}", counter, err, address);
169                    if counter < retry_attempts {
170                        // Await `retry_interval` time before retrying
171                        tokio::time::sleep(retry_interval).await;
172                    } else {
173                        debug!("P2P, Client connection retry #{} - all failed", retry_attempts);
174                        return Err(err);
175                    }
176                }
177            }
178        }
179    }
180
181    // TODO: revisit the below constants
182    fn outgoing_network_channel_size() -> usize {
183        // TODO: this number is taken from go-kaspad and should be re-evaluated
184        (1 << 17) + 256
185    }
186
187    fn communication_timeout() -> u64 {
188        10_000
189    }
190
191    fn keep_alive() -> u64 {
192        10_000
193    }
194
195    fn connect_timeout() -> u64 {
196        1_000
197    }
198}
199
200#[tonic::async_trait]
201impl ProtoP2p for ConnectionHandler {
202    type MessageStreamStream = Pin<Box<dyn futures::Stream<Item = Result<KaspadMessage, TonicStatus>> + Send + 'static>>;
203
204    /// Handle the new arriving **server** connections
205    async fn message_stream(
206        &self,
207        request: Request<Streaming<KaspadMessage>>,
208    ) -> Result<Response<Self::MessageStreamStream>, TonicStatus> {
209        let Some(remote_address) = request.remote_addr() else {
210            return Err(TonicStatus::new(tonic::Code::InvalidArgument, "Incoming connection opening request has no remote address"));
211        };
212
213        // Build the in/out pipes
214        let (outgoing_route, outgoing_receiver) = mpsc_channel(Self::outgoing_network_channel_size());
215        let incoming_stream = request.into_inner();
216
217        // Build the router object
218        let router = Router::new(remote_address, false, self.hub_sender.clone(), incoming_stream, outgoing_route).await;
219
220        // Notify the central Hub about the new peer
221        self.hub_sender.send(HubEvent::NewPeer(router)).await.expect("hub receiver should never drop before senders");
222
223        // Give tonic a receiver stream (messages sent to it will be forwarded to the network peer)
224        Ok(Response::new(Box::pin(ReceiverStream::new(outgoing_receiver).map(Ok)) as Self::MessageStreamStream))
225    }
226}