snarkos_node_tcp/protocols/
reading.rs1#[cfg(doc)]
17use crate::{Config, protocols::Handshake};
18use crate::{
19 Connection,
20 ConnectionSide,
21 P2P,
22 Tcp,
23 protocols::{ProtocolHandler, ReturnableConnection},
24};
25
26use async_trait::async_trait;
27use bytes::BytesMut;
28use futures_util::StreamExt;
29use std::{
30 io,
31 net::SocketAddr,
32 time::{Duration, Instant},
33};
34use tokio::{
35 io::AsyncRead,
36 sync::{mpsc, oneshot},
37 time::timeout,
38};
39use tokio_util::codec::{Decoder, FramedRead};
40use tracing::*;
41
42#[async_trait]
50pub trait Reading: P2P
51where
52 Self: Clone + Send + Sync + 'static,
53{
54 fn message_queue_depth(&self) -> usize {
60 1024
61 }
62
63 const INITIAL_BUFFER_SIZE: usize = 1024 * 1024;
68
69 const IDLE_TIMEOUT: Duration = Duration::from_secs(150);
71
72 type Message: Send;
74
75 type Codec: Decoder<Item = Self::Message, Error = io::Error> + Send;
77
78 async fn enable_reading(&self) {
80 let (conn_sender, mut conn_receiver) = mpsc::channel(self.tcp().config().max_connections as usize);
81
82 let (tx_reading, rx_reading) = oneshot::channel();
84
85 let self_clone = self.clone();
87 let reading_task = tokio::spawn(async move {
88 trace!(parent: self_clone.tcp().span(), "spawned the Reading handler task");
89 tx_reading.send(()).unwrap(); while let Some(returnable_conn) = conn_receiver.recv().await {
93 self_clone.handle_new_connection(returnable_conn).await;
94 }
95 });
96 let _ = rx_reading.await;
97 self.tcp().tasks.lock().push(reading_task);
98
99 let hdl = Box::new(ProtocolHandler(conn_sender));
101 assert!(self.tcp().protocols.reading.set(hdl).is_ok(), "the Reading protocol was enabled more than once!");
102 }
103
104 fn codec(&self, addr: SocketAddr, side: ConnectionSide) -> Self::Codec;
107
108 async fn process_message(&self, source: SocketAddr, message: Self::Message) -> io::Result<()>;
110}
111
112#[async_trait]
114trait ReadingInternal: Reading {
115 async fn handle_new_connection(&self, (conn, conn_returner): ReturnableConnection);
117
118 fn map_codec<T: AsyncRead>(
120 &self,
121 framed: FramedRead<T, Self::Codec>,
122 conn: &Connection,
123 ) -> FramedRead<T, CountingCodec<Self::Codec>>;
124}
125
126#[async_trait]
127impl<R: Reading> ReadingInternal for R {
128 async fn handle_new_connection(&self, (mut conn, conn_returner): ReturnableConnection) {
129 let addr = conn.addr();
130 let codec = self.codec(addr, !conn.side());
131 let reader = conn.reader.take().expect("missing connection reader!");
132 let framed = FramedRead::new(reader, codec);
133 let mut framed = self.map_codec(framed, &conn);
134
135 let (tx_conn_ready, rx_conn_ready) = oneshot::channel();
137 conn.readiness_notifier = Some(tx_conn_ready);
138
139 if Self::INITIAL_BUFFER_SIZE != 0 {
140 framed.read_buffer_mut().reserve(Self::INITIAL_BUFFER_SIZE);
141 }
142
143 let (inbound_message_sender, mut inbound_message_receiver) =
144 mpsc::channel::<(R::Message, QueuedMessageGuard)>(self.message_queue_depth());
145
146 let (tx_processing, rx_processing) = oneshot::channel::<()>();
148
149 let self_clone = self.clone();
151 let conn_span = conn.span().clone();
152 let inbound_processing_task = tokio::spawn(Box::pin(async move {
153 let node = self_clone.tcp();
154 trace!(parent: &conn_span, "spawned a task for processing messages");
155 tx_processing.send(()).unwrap(); while let Some((msg, _guard)) = inbound_message_receiver.recv().await {
158 if let Err(e) = self_clone.process_message(addr, msg).await {
159 error!(parent: &conn_span, "can't process a message: {e}");
160 node.known_peers().register_failure(addr.ip());
161 }
162 }
164 }));
165 let _ = rx_processing.await;
166 conn.tasks.push(inbound_processing_task);
167
168 let (tx_reader, rx_reader) = oneshot::channel::<()>();
170
171 let node = self.tcp().clone();
173 let conn_span = conn.span().clone();
174 let reader_task = tokio::spawn(Box::pin(async move {
175 trace!(parent: &conn_span, "spawned a task for reading messages");
176 tx_reader.send(()).unwrap(); let _ = rx_conn_ready.await;
181
182 let mut dropped_count: usize = 0;
184 let mut last_drop_log = Instant::now();
185
186 loop {
187 let next_frame_future = framed.next();
188 let read_result = match timeout(Self::IDLE_TIMEOUT, next_frame_future).await {
189 Ok(res) => res, Err(_) => {
191 debug!(parent: &conn_span, "connection timed out due to inactivity");
192 break;
193 }
194 };
195 match read_result {
196 Some(Ok(msg)) => {
197 if let Err(e) = inbound_message_sender.try_send((msg, QueuedMessageGuard::new())) {
199 node.stats().register_failure();
200 match e {
201 mpsc::error::TrySendError::Full(_) => {
202 dropped_count += 1;
204 if last_drop_log.elapsed() >= Duration::from_secs(1) {
205 warn_about_dropped_messages(&conn_span, &mut dropped_count, &mut last_drop_log);
206 }
207 }
208 mpsc::error::TrySendError::Closed(_) => {
209 error!(parent: &conn_span, "inbound channel closed");
210 break;
211 }
212 }
213 } else if dropped_count != 0 {
214 warn_about_dropped_messages(&conn_span, &mut dropped_count, &mut last_drop_log);
215 debug!(parent: &conn_span, "the inbound queue is no longer saturated");
216 }
217 #[cfg(feature = "metrics")]
218 metrics::increment_gauge(metrics::tcp::TCP_TASKS, 1f64);
219 }
220 Some(Err(e)) => {
221 error!(parent: &conn_span, "can't read: {e}");
222 node.known_peers().register_failure(addr.ip());
223 if node.config().fatal_io_errors.contains(&e.kind()) {
224 break;
225 }
226 }
227 None => break, }
229 }
230
231 let _ = node.disconnect(addr).await;
232 }));
233 let _ = rx_reader.await;
234 conn.tasks.push(reader_task);
235
236 if conn_returner.send(Ok(conn)).is_err() {
238 unreachable!("couldn't return a Connection to the Tcp");
239 }
240 }
241
242 fn map_codec<T: AsyncRead>(
243 &self,
244 framed: FramedRead<T, Self::Codec>,
245 conn: &Connection,
246 ) -> FramedRead<T, CountingCodec<Self::Codec>> {
247 framed.map_decoder(|codec| CountingCodec { codec, node: self.tcp().clone(), acc: 0, span: conn.span().clone() })
248 }
249}
250
251struct CountingCodec<D: Decoder> {
253 codec: D,
254 node: Tcp,
255 acc: usize,
256 span: Span,
257}
258
259impl<D: Decoder> Decoder for CountingCodec<D> {
260 type Error = D::Error;
261 type Item = D::Item;
262
263 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
264 let initial_buf_len = src.len();
265 let ret = self.codec.decode(src)?;
266 let final_buf_len = src.len();
267 let read_len = initial_buf_len - final_buf_len + self.acc;
268
269 if read_len != 0 {
270 trace!(parent: &self.span, "read {read_len}B");
271
272 if ret.is_some() {
273 self.acc = 0;
274 self.node.stats().register_received_message(read_len);
276 } else {
277 self.acc = read_len;
278 }
279 }
280
281 Ok(ret)
282 }
283}
284
285struct QueuedMessageGuard;
290
291impl QueuedMessageGuard {
292 fn new() -> Self {
293 #[cfg(feature = "metrics")]
294 metrics::increment_gauge(metrics::tcp::TCP_TASKS, 1f64);
295 Self
296 }
297}
298
299impl Drop for QueuedMessageGuard {
300 fn drop(&mut self) {
301 #[cfg(feature = "metrics")]
302 metrics::decrement_gauge(metrics::tcp::TCP_TASKS, 1f64);
303 }
304}
305
306fn warn_about_dropped_messages(span: &Span, dropped_count: &mut usize, last_drop_log: &mut Instant) {
308 warn!(
309 parent: span,
310 "dropped {dropped_count} messages due\
311 to inbound queue saturation",
312 );
313 *dropped_count = 0;
315 *last_drop_log = Instant::now();
316}