snarkos_node_tcp/protocols/
writing.rs1use std::{any::Any, collections::HashMap, io, net::SocketAddr, sync::Arc};
17
18use async_trait::async_trait;
19use futures_util::sink::SinkExt;
20#[cfg(feature = "locktick")]
21use locktick::parking_lot::RwLock;
22#[cfg(not(feature = "locktick"))]
23use parking_lot::RwLock;
24use tokio::{
25 io::AsyncWrite,
26 sync::{mpsc, oneshot},
27};
28use tokio_util::codec::{Encoder, FramedWrite};
29use tracing::*;
30
31#[cfg(doc)]
32use crate::{Config, Tcp, protocols::Handshake};
33use crate::{
34 Connection,
35 ConnectionSide,
36 P2P,
37 protocols::{Protocol, ProtocolHandler, ReturnableConnection},
38};
39
40type WritingSenders = Arc<RwLock<HashMap<SocketAddr, mpsc::Sender<WrappedMessage>>>>;
41
42#[async_trait]
45pub trait Writing: P2P
46where
47 Self: Clone + Send + Sync + 'static,
48{
49 fn message_queue_depth(&self) -> usize {
55 1024
56 }
57
58 type Message: Send;
62
63 type Codec: Encoder<Self::Message, Error = io::Error> + Send;
65
66 async fn enable_writing(&self) {
68 let (conn_sender, mut conn_receiver) = mpsc::unbounded_channel();
69
70 let conn_senders: WritingSenders = Default::default();
72 let senders = conn_senders.clone();
74
75 let (tx_writing, rx_writing) = oneshot::channel();
77
78 let self_clone = self.clone();
80 let writing_task = tokio::spawn(async move {
81 trace!(parent: self_clone.tcp().span(), "spawned the Writing handler task");
82 tx_writing.send(()).unwrap(); while let Some(returnable_conn) = conn_receiver.recv().await {
86 self_clone.handle_new_connection(returnable_conn, &conn_senders).await;
87 }
88 });
89 let _ = rx_writing.await;
90 self.tcp().tasks.lock().push(writing_task);
91
92 let hdl = Box::new(WritingHandler { handler: ProtocolHandler(conn_sender), senders });
94 assert!(self.tcp().protocols.writing.set(hdl).is_ok(), "the Writing protocol was enabled more than once!");
95 }
96
97 fn codec(&self, addr: SocketAddr, side: ConnectionSide) -> Self::Codec;
100
101 fn unicast(&self, addr: SocketAddr, message: Self::Message) -> io::Result<oneshot::Receiver<io::Result<()>>> {
112 if let Some(handler) = self.tcp().protocols.writing.get() {
114 if let Some(sender) = handler.senders.read().get(&addr).cloned() {
116 let (msg, delivery) = WrappedMessage::new(Box::new(message));
117 sender
118 .try_send(msg)
119 .map_err(|e| {
120 error!(parent: self.tcp().span(), "can't send a message to {}: {}", addr, e);
121 self.tcp().stats().register_failure();
122 io::ErrorKind::Other.into()
123 })
124 .map(|_| delivery)
125 } else {
126 Err(io::ErrorKind::NotConnected.into())
127 }
128 } else {
129 Err(io::ErrorKind::Unsupported.into())
130 }
131 }
132
133 fn broadcast(&self, message: Self::Message) -> io::Result<()>
142 where
143 Self::Message: Clone,
144 {
145 if let Some(handler) = self.tcp().protocols.writing.get() {
147 let senders = handler.senders.read().clone();
148 for (addr, message_sender) in senders {
149 let (msg, _delivery) = WrappedMessage::new(Box::new(message.clone()));
150 let _ = message_sender.try_send(msg).map_err(|e| {
151 error!(parent: self.tcp().span(), "can't send a message to {}: {}", addr, e);
152 self.tcp().stats().register_failure();
153 });
154 }
155
156 Ok(())
157 } else {
158 Err(io::ErrorKind::Unsupported.into())
159 }
160 }
161}
162
163#[async_trait]
165trait WritingInternal: Writing {
166 async fn write_to_stream<W: AsyncWrite + Unpin + Send>(
168 &self,
169 message: Self::Message,
170 writer: &mut FramedWrite<W, Self::Codec>,
171 ) -> Result<usize, <Self::Codec as Encoder<Self::Message>>::Error>;
172
173 async fn handle_new_connection(&self, (conn, conn_returner): ReturnableConnection, conn_senders: &WritingSenders);
175}
176
177#[async_trait]
178impl<W: Writing> WritingInternal for W {
179 async fn write_to_stream<A: AsyncWrite + Unpin + Send>(
180 &self,
181 message: Self::Message,
182 writer: &mut FramedWrite<A, Self::Codec>,
183 ) -> Result<usize, <Self::Codec as Encoder<Self::Message>>::Error> {
184 writer.feed(message).await?;
185 let len = writer.write_buffer().len();
186 writer.flush().await?;
187
188 Ok(len)
189 }
190
191 async fn handle_new_connection(
192 &self,
193 (mut conn, conn_returner): ReturnableConnection,
194 conn_senders: &WritingSenders,
195 ) {
196 let addr = conn.addr();
197 let codec = self.codec(addr, !conn.side());
198 let writer = conn.writer.take().expect("missing connection writer!");
199 let mut framed = FramedWrite::new(writer, codec);
200
201 let (outbound_message_sender, mut outbound_message_receiver) = mpsc::channel(self.message_queue_depth());
202
203 conn_senders.write().insert(addr, outbound_message_sender);
205
206 let auto_cleanup = SenderCleanup { addr, senders: Arc::clone(conn_senders) };
208
209 let (tx_writer, rx_writer) = oneshot::channel();
211
212 let self_clone = self.clone();
214 let writer_task = tokio::spawn(async move {
215 let node = self_clone.tcp();
216 trace!(parent: node.span(), "spawned a task for writing messages to {}", addr);
217 tx_writer.send(()).unwrap(); let _auto_cleanup = auto_cleanup;
221
222 while let Some(wrapped_msg) = outbound_message_receiver.recv().await {
223 let msg = wrapped_msg.msg.downcast().unwrap();
224
225 match self_clone.write_to_stream(*msg, &mut framed).await {
226 Ok(len) => {
227 let _ = wrapped_msg.delivery_notification.send(Ok(()));
228 node.known_peers().register_sent_message(addr.ip(), len);
229 node.stats().register_sent_message(len);
230 trace!(parent: node.span(), "sent {}B to {}", len, addr);
231 }
232 Err(e) => {
233 node.known_peers().register_failure(addr.ip());
234 error!(parent: node.span(), "couldn't send a message to {}: {}", addr, e);
235 let is_fatal = node.config().fatal_io_errors.contains(&e.kind());
236 let _ = wrapped_msg.delivery_notification.send(Err(e));
237 if is_fatal {
238 break;
239 }
240 }
241 }
242 }
243
244 node.disconnect(addr).await;
245 });
246 let _ = rx_writer.await;
247 conn.tasks.push(writer_task);
248
249 if conn_returner.send(Ok(conn)).is_err() {
251 unreachable!("couldn't return a Connection to the Tcp");
252 }
253 }
254}
255
256struct WrappedMessage {
258 msg: Box<dyn Any + Send>,
259 delivery_notification: oneshot::Sender<io::Result<()>>,
260}
261
262impl WrappedMessage {
263 fn new(msg: Box<dyn Any + Send>) -> (Self, oneshot::Receiver<io::Result<()>>) {
264 let (tx, rx) = oneshot::channel();
265 let wrapped_msg = Self { msg, delivery_notification: tx };
266
267 (wrapped_msg, rx)
268 }
269}
270
271pub(crate) struct WritingHandler {
273 handler: ProtocolHandler<Connection, io::Result<Connection>>,
274 senders: WritingSenders,
275}
276
277impl Protocol<Connection, io::Result<Connection>> for WritingHandler {
278 fn trigger(&self, item: ReturnableConnection) {
279 self.handler.trigger(item);
280 }
281}
282
283struct SenderCleanup {
284 addr: SocketAddr,
285 senders: WritingSenders,
286}
287
288impl Drop for SenderCleanup {
289 fn drop(&mut self) {
290 self.senders.write().remove(&self.addr);
291 }
292}