1use crate::common::ProtocolError;
2use crate::core::hub::HubEvent;
3use crate::pb::{
4 p2p_client::P2pClient as ProtoP2pClient, p2p_server::P2p as ProtoP2p, p2p_server::P2pServer as ProtoP2pServer, KaspadMessage,
5};
6use crate::{ConnectionInitializer, Router};
7use futures::FutureExt;
8use kaspa_core::{debug, info};
9use kaspa_utils::networking::NetAddress;
10use kaspa_utils_tower::{
11 counters::TowerConnectionCounters,
12 middleware::{measure_request_body_size_layer, CountBytesBody, MapResponseBodyLayer, ServiceBuilder},
13};
14use std::net::ToSocketAddrs;
15use std::pin::Pin;
16use std::sync::Arc;
17use std::time::Duration;
18use thiserror::Error;
19use tokio::sync::mpsc::{channel as mpsc_channel, Sender as MpscSender};
20use tokio::sync::oneshot::{channel as oneshot_channel, Sender as OneshotSender};
21use tokio_stream::wrappers::ReceiverStream;
22use tokio_stream::StreamExt;
23use tonic::codegen::Body;
24use tonic::transport::{Error as TonicError, Server as TonicServer};
25use tonic::{Request, Response, Status as TonicStatus, Streaming};
26
27#[derive(Error, Debug)]
28pub enum ConnectionError {
29 #[error("missing socket address")]
30 NoAddress,
31
32 #[error("{0}")]
33 IoError(#[from] std::io::Error),
34
35 #[error("{0}")]
36 TonicError(#[from] TonicError),
37
38 #[error("{0}")]
39 TonicStatus(#[from] TonicStatus),
40
41 #[error("{0}")]
42 ProtocolError(#[from] ProtocolError),
43}
44
45const P2P_MAX_MESSAGE_SIZE: usize = 1024 * 1024 * 1024; #[derive(Clone)]
50pub struct ConnectionHandler {
51 hub_sender: MpscSender<HubEvent>,
53 initializer: Arc<dyn ConnectionInitializer>,
54 counters: Arc<TowerConnectionCounters>,
55}
56
57impl ConnectionHandler {
58 pub(crate) fn new(
59 hub_sender: MpscSender<HubEvent>,
60 initializer: Arc<dyn ConnectionInitializer>,
61 counters: Arc<TowerConnectionCounters>,
62 ) -> Self {
63 Self { hub_sender, initializer, counters }
64 }
65
66 pub(crate) fn serve(&self, serve_address: NetAddress) -> Result<OneshotSender<()>, ConnectionError> {
68 let (termination_sender, termination_receiver) = oneshot_channel::<()>();
69 let connection_handler = self.clone();
70 info!("P2P Server starting on: {}", serve_address);
71
72 let bytes_tx = self.counters.bytes_tx.clone();
73 let bytes_rx = self.counters.bytes_rx.clone();
74
75 tokio::spawn(async move {
76 let proto_server = ProtoP2pServer::new(connection_handler)
77 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
78 .send_compressed(tonic::codec::CompressionEncoding::Gzip)
79 .max_decoding_message_size(P2P_MAX_MESSAGE_SIZE);
80
81 let serve_result = TonicServer::builder()
83 .layer(measure_request_body_size_layer(bytes_rx, |b| b))
84 .layer(MapResponseBodyLayer::new(move |body| CountBytesBody::new(body, bytes_tx.clone())))
85 .add_service(proto_server)
86 .serve_with_shutdown(serve_address.into(), termination_receiver.map(drop))
87 .await;
88
89 match serve_result {
90 Ok(_) => info!("P2P Server stopped: {}", serve_address),
91 Err(err) => panic!("P2P, Server {serve_address} stopped with error: {err:?}"),
92 }
93 });
94 Ok(termination_sender)
95 }
96
97 pub(crate) async fn connect(&self, peer_address: String) -> Result<Arc<Router>, ConnectionError> {
99 let Some(socket_address) = peer_address.to_socket_addrs()?.next() else {
100 return Err(ConnectionError::NoAddress);
101 };
102 let peer_address = format!("http://{}", peer_address); let channel = tonic::transport::Endpoint::new(peer_address)?
105 .timeout(Duration::from_millis(Self::communication_timeout()))
106 .connect_timeout(Duration::from_millis(Self::connect_timeout()))
107 .tcp_keepalive(Some(Duration::from_millis(Self::keep_alive())))
108 .connect()
109 .await?;
110
111 let channel = ServiceBuilder::new()
112 .layer(MapResponseBodyLayer::new(move |body| CountBytesBody::new(body, self.counters.bytes_rx.clone())))
113 .layer(measure_request_body_size_layer(self.counters.bytes_tx.clone(), |body| {
114 body.map_err(|e| tonic::Status::from_error(Box::new(e))).boxed_unsync()
115 }))
116 .service(channel);
117
118 let mut client = ProtoP2pClient::new(channel)
119 .send_compressed(tonic::codec::CompressionEncoding::Gzip)
120 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
121 .max_decoding_message_size(P2P_MAX_MESSAGE_SIZE);
122
123 let (outgoing_route, outgoing_receiver) = mpsc_channel(Self::outgoing_network_channel_size());
124 let incoming_stream = client.message_stream(ReceiverStream::new(outgoing_receiver)).await?.into_inner();
125
126 let router = Router::new(socket_address, true, self.hub_sender.clone(), incoming_stream, outgoing_route).await;
127
128 match self.initializer.initialize_connection(router.clone()).await {
130 Ok(()) => {
131 self.hub_sender.send(HubEvent::NewPeer(router.clone())).await.expect("hub receiver should never drop before senders");
133 }
134
135 Err(err) => {
136 router.try_sending_reject_message(&err).await;
137 router.close().await;
139 debug!("P2P, handshake failed for outbound peer {}: {}", router, err);
140 return Err(ConnectionError::ProtocolError(err));
141 }
142 }
143
144 Ok(router)
145 }
146
147 pub(crate) async fn connect_with_retry(
149 &self,
150 address: String,
151 retry_attempts: u8,
152 retry_interval: Duration,
153 ) -> Result<Arc<Router>, ConnectionError> {
154 let mut counter = 0;
155 loop {
156 counter += 1;
157 match self.connect(address.clone()).await {
158 Ok(router) => {
159 debug!("P2P, Client connected, peer: {:?}", address);
160 return Ok(router);
161 }
162 Err(ConnectionError::ProtocolError(err)) => {
163 debug!("P2P, connect retry #{} failed with error {:?}, peer: {:?}, aborting retries", counter, err, address);
165 return Err(ConnectionError::ProtocolError(err));
166 }
167 Err(err) => {
168 debug!("P2P, connect retry #{} failed with error {:?}, peer: {:?}", counter, err, address);
169 if counter < retry_attempts {
170 tokio::time::sleep(retry_interval).await;
172 } else {
173 debug!("P2P, Client connection retry #{} - all failed", retry_attempts);
174 return Err(err);
175 }
176 }
177 }
178 }
179 }
180
181 fn outgoing_network_channel_size() -> usize {
183 (1 << 17) + 256
185 }
186
187 fn communication_timeout() -> u64 {
188 10_000
189 }
190
191 fn keep_alive() -> u64 {
192 10_000
193 }
194
195 fn connect_timeout() -> u64 {
196 1_000
197 }
198}
199
200#[tonic::async_trait]
201impl ProtoP2p for ConnectionHandler {
202 type MessageStreamStream = Pin<Box<dyn futures::Stream<Item = Result<KaspadMessage, TonicStatus>> + Send + 'static>>;
203
204 async fn message_stream(
206 &self,
207 request: Request<Streaming<KaspadMessage>>,
208 ) -> Result<Response<Self::MessageStreamStream>, TonicStatus> {
209 let Some(remote_address) = request.remote_addr() else {
210 return Err(TonicStatus::new(tonic::Code::InvalidArgument, "Incoming connection opening request has no remote address"));
211 };
212
213 let (outgoing_route, outgoing_receiver) = mpsc_channel(Self::outgoing_network_channel_size());
215 let incoming_stream = request.into_inner();
216
217 let router = Router::new(remote_address, false, self.hub_sender.clone(), incoming_stream, outgoing_route).await;
219
220 self.hub_sender.send(HubEvent::NewPeer(router)).await.expect("hub receiver should never drop before senders");
222
223 Ok(Response::new(Box::pin(ReceiverStream::new(outgoing_receiver).map(Ok)) as Self::MessageStreamStream))
225 }
226}