snarkos_node_tcp/protocols/
writing.rs1use std::{any::Any, collections::HashMap, io, net::SocketAddr, sync::Arc, time::Duration};
17
18use async_trait::async_trait;
19use futures_util::sink::SinkExt;
20#[cfg(feature = "locktick")]
21use locktick::parking_lot::RwLock;
22#[cfg(not(feature = "locktick"))]
23use parking_lot::RwLock;
24use tokio::{
25 io::AsyncWrite,
26 sync::{mpsc, oneshot},
27 time::timeout,
28};
29use tokio_util::codec::{Encoder, FramedWrite};
30use tracing::*;
31
32#[cfg(doc)]
33use crate::{Config, Tcp, protocols::Handshake};
34use crate::{
35 Connection,
36 ConnectionSide,
37 P2P,
38 connections::create_connection_span,
39 protocols::{Protocol, ProtocolHandler, ReturnableConnection},
40};
41
42type WritingSenders = Arc<RwLock<HashMap<SocketAddr, mpsc::Sender<WrappedMessage>>>>;
43
44#[async_trait]
47pub trait Writing: P2P
48where
49 Self: Clone + Send + Sync + 'static,
50{
51 fn message_queue_depth(&self) -> usize {
57 1024
58 }
59
60 const TIMEOUT: Duration = Duration::from_secs(5);
63
64 type Message: Send;
68
69 type Codec: Encoder<Self::Message, Error = io::Error> + Send;
71
72 async fn enable_writing(&self) {
74 let (conn_sender, mut conn_receiver) = mpsc::channel(self.tcp().config().max_connections as usize);
75
76 let conn_senders: WritingSenders = Default::default();
78 let senders = conn_senders.clone();
80
81 let (tx_writing, rx_writing) = oneshot::channel();
83
84 let self_clone = self.clone();
86 let writing_task = tokio::spawn(async move {
87 trace!(parent: self_clone.tcp().span(), "spawned the Writing handler task");
88 tx_writing.send(()).unwrap(); while let Some(returnable_conn) = conn_receiver.recv().await {
92 self_clone.handle_new_connection(returnable_conn, &conn_senders).await;
93 }
94 });
95 let _ = rx_writing.await;
96 self.tcp().tasks.lock().push(writing_task);
97
98 let hdl = Box::new(WritingHandler { handler: ProtocolHandler(conn_sender), senders });
100 assert!(self.tcp().protocols.writing.set(hdl).is_ok(), "the Writing protocol was enabled more than once!");
101 }
102
103 fn codec(&self, addr: SocketAddr, side: ConnectionSide) -> Self::Codec;
106
107 fn unicast(&self, addr: SocketAddr, message: Self::Message) -> io::Result<oneshot::Receiver<io::Result<()>>> {
118 if let Some(handler) = self.tcp().protocols.writing.get() {
120 if let Some(sender) = handler.senders.read().get(&addr).cloned() {
122 let (msg, delivery) = WrappedMessage::new(Box::new(message));
123 sender
124 .try_send(msg)
125 .map_err(|e| {
126 let conn_span = create_connection_span(addr, self.tcp().span());
127 error!(parent: conn_span, "can't send a message: {e}");
128 self.tcp().stats().register_failure();
129 io::ErrorKind::Other.into()
130 })
131 .map(|_| delivery)
132 } else {
133 Err(io::ErrorKind::NotConnected.into())
134 }
135 } else {
136 Err(io::ErrorKind::Unsupported.into())
137 }
138 }
139
140 fn broadcast(&self, message: Self::Message) -> io::Result<()>
149 where
150 Self::Message: Clone,
151 {
152 if let Some(handler) = self.tcp().protocols.writing.get() {
154 let senders = handler.senders.read().clone();
155 for (addr, message_sender) in senders {
156 let (msg, _delivery) = WrappedMessage::new(Box::new(message.clone()));
157 let _ = message_sender.try_send(msg).map_err(|e| {
158 let conn_span = create_connection_span(addr, self.tcp().span());
159 error!(parent: conn_span, "can't send a message: {e}");
160 self.tcp().stats().register_failure();
161 });
162 }
163
164 Ok(())
165 } else {
166 Err(io::ErrorKind::Unsupported.into())
167 }
168 }
169}
170
171#[async_trait]
173trait WritingInternal: Writing {
174 async fn write_to_stream<W: AsyncWrite + Unpin + Send>(
176 &self,
177 message: Self::Message,
178 writer: &mut FramedWrite<W, Self::Codec>,
179 ) -> Result<usize, <Self::Codec as Encoder<Self::Message>>::Error>;
180
181 async fn handle_new_connection(&self, (conn, conn_returner): ReturnableConnection, conn_senders: &WritingSenders);
183}
184
185#[async_trait]
186impl<W: Writing> WritingInternal for W {
187 async fn write_to_stream<A: AsyncWrite + Unpin + Send>(
188 &self,
189 message: Self::Message,
190 writer: &mut FramedWrite<A, Self::Codec>,
191 ) -> Result<usize, <Self::Codec as Encoder<Self::Message>>::Error> {
192 writer.feed(message).await?;
193 let len = writer.write_buffer().len();
194 match timeout(W::TIMEOUT, writer.flush()).await {
196 Ok(Ok(())) => Ok(len),
197 Ok(Err(e)) => Err(e),
198 Err(_) => Err(io::Error::new(io::ErrorKind::TimedOut, "write timed out")),
199 }
200 }
201
202 async fn handle_new_connection(
203 &self,
204 (mut conn, conn_returner): ReturnableConnection,
205 conn_senders: &WritingSenders,
206 ) {
207 let addr = conn.addr();
208 let codec = self.codec(addr, !conn.side());
209 let writer = conn.writer.take().expect("missing connection writer!");
210 let mut framed = FramedWrite::new(writer, codec);
211
212 let (outbound_message_sender, mut outbound_message_receiver) = mpsc::channel(self.message_queue_depth());
213
214 conn_senders.write().insert(addr, outbound_message_sender);
216
217 let auto_cleanup = SenderCleanup { addr, senders: Arc::clone(conn_senders) };
219
220 let (tx_writer, rx_writer) = oneshot::channel();
222
223 let self_clone = self.clone();
225 let conn_span = conn.span().clone();
226 let writer_task = tokio::spawn(Box::pin(async move {
227 let node = self_clone.tcp();
228 trace!(parent: &conn_span, "spawned a task for writing messages");
229 tx_writer.send(()).unwrap(); let _auto_cleanup = auto_cleanup;
233
234 while let Some(wrapped_msg) = outbound_message_receiver.recv().await {
235 let msg = wrapped_msg.msg.downcast().unwrap();
236
237 match self_clone.write_to_stream(*msg, &mut framed).await {
238 Ok(len) => {
239 let _ = wrapped_msg.delivery_notification.send(Ok(()));
240 node.stats().register_sent_message(len);
242 trace!(parent: &conn_span, "sent {len}B");
243 }
244 Err(e) => {
245 node.known_peers().register_failure(addr.ip());
246 error!(parent: &conn_span, "couldn't send a message: {e}");
247 let is_fatal = node.config().fatal_io_errors.contains(&e.kind());
248 let _ = wrapped_msg.delivery_notification.send(Err(e));
249 if is_fatal {
250 break;
251 }
252 }
253 }
254 }
255
256 node.disconnect(addr).await;
257 }));
258 let _ = rx_writer.await;
259 conn.tasks.push(writer_task);
260
261 if conn_returner.send(Ok(conn)).is_err() {
263 unreachable!("couldn't return a Connection to the Tcp");
264 }
265 }
266}
267
268struct WrappedMessage {
270 msg: Box<dyn Any + Send>,
271 delivery_notification: oneshot::Sender<io::Result<()>>,
272}
273
274impl WrappedMessage {
275 fn new(msg: Box<dyn Any + Send>) -> (Self, oneshot::Receiver<io::Result<()>>) {
276 let (tx, rx) = oneshot::channel();
277 let wrapped_msg = Self { msg, delivery_notification: tx };
278
279 (wrapped_msg, rx)
280 }
281}
282
283pub(crate) struct WritingHandler {
285 handler: ProtocolHandler<Connection, io::Result<Connection>>,
286 senders: WritingSenders,
287}
288
289impl Protocol<Connection, io::Result<Connection>> for WritingHandler {
290 async fn trigger(&self, item: ReturnableConnection) {
291 self.handler.trigger(item).await;
292 }
293}
294
295struct SenderCleanup {
296 addr: SocketAddr,
297 senders: WritingSenders,
298}
299
300impl Drop for SenderCleanup {
301 fn drop(&mut self) {
302 self.senders.write().remove(&self.addr);
303 }
304}