snarkos_node_tcp/protocols/
writing.rs1use std::{any::Any, collections::HashMap, io, net::SocketAddr, sync::Arc};
17
18use async_trait::async_trait;
19use futures_util::sink::SinkExt;
20use parking_lot::RwLock;
21use tokio::{
22 io::AsyncWrite,
23 sync::{mpsc, oneshot},
24};
25use tokio_util::codec::{Encoder, FramedWrite};
26use tracing::*;
27
28#[cfg(doc)]
29use crate::{Config, Tcp, protocols::Handshake};
30use crate::{
31 Connection,
32 ConnectionSide,
33 P2P,
34 protocols::{Protocol, ProtocolHandler, ReturnableConnection},
35};
36
37type WritingSenders = Arc<RwLock<HashMap<SocketAddr, mpsc::Sender<WrappedMessage>>>>;
38
39#[async_trait]
42pub trait Writing: P2P
43where
44 Self: Clone + Send + Sync + 'static,
45{
46 const MESSAGE_QUEUE_DEPTH: usize = 1024;
52
53 type Message: Send;
57
58 type Codec: Encoder<Self::Message, Error = io::Error> + Send;
60
61 async fn enable_writing(&self) {
63 let (conn_sender, mut conn_receiver) = mpsc::unbounded_channel();
64
65 let conn_senders: WritingSenders = Default::default();
67 let senders = conn_senders.clone();
69
70 let (tx_writing, rx_writing) = oneshot::channel();
72
73 let self_clone = self.clone();
75 let writing_task = tokio::spawn(async move {
76 trace!(parent: self_clone.tcp().span(), "spawned the Writing handler task");
77 tx_writing.send(()).unwrap(); while let Some(returnable_conn) = conn_receiver.recv().await {
81 self_clone.handle_new_connection(returnable_conn, &conn_senders).await;
82 }
83 });
84 let _ = rx_writing.await;
85 self.tcp().tasks.lock().push(writing_task);
86
87 let hdl = Box::new(WritingHandler { handler: ProtocolHandler(conn_sender), senders });
89 assert!(self.tcp().protocols.writing.set(hdl).is_ok(), "the Writing protocol was enabled more than once!");
90 }
91
92 fn codec(&self, addr: SocketAddr, side: ConnectionSide) -> Self::Codec;
95
96 fn unicast(&self, addr: SocketAddr, message: Self::Message) -> io::Result<oneshot::Receiver<io::Result<()>>> {
107 if let Some(handler) = self.tcp().protocols.writing.get() {
109 if let Some(sender) = handler.senders.read().get(&addr).cloned() {
111 let (msg, delivery) = WrappedMessage::new(Box::new(message));
112 sender
113 .try_send(msg)
114 .map_err(|e| {
115 error!(parent: self.tcp().span(), "can't send a message to {}: {}", addr, e);
116 self.tcp().stats().register_failure();
117 io::ErrorKind::Other.into()
118 })
119 .map(|_| delivery)
120 } else {
121 Err(io::ErrorKind::NotConnected.into())
122 }
123 } else {
124 Err(io::ErrorKind::Unsupported.into())
125 }
126 }
127
128 fn broadcast(&self, message: Self::Message) -> io::Result<()>
137 where
138 Self::Message: Clone,
139 {
140 if let Some(handler) = self.tcp().protocols.writing.get() {
142 let senders = handler.senders.read().clone();
143 for (addr, message_sender) in senders {
144 let (msg, _delivery) = WrappedMessage::new(Box::new(message.clone()));
145 let _ = message_sender.try_send(msg).map_err(|e| {
146 error!(parent: self.tcp().span(), "can't send a message to {}: {}", addr, e);
147 self.tcp().stats().register_failure();
148 });
149 }
150
151 Ok(())
152 } else {
153 Err(io::ErrorKind::Unsupported.into())
154 }
155 }
156}
157
158#[async_trait]
160trait WritingInternal: Writing {
161 async fn write_to_stream<W: AsyncWrite + Unpin + Send>(
163 &self,
164 message: Self::Message,
165 writer: &mut FramedWrite<W, Self::Codec>,
166 ) -> Result<usize, <Self::Codec as Encoder<Self::Message>>::Error>;
167
168 async fn handle_new_connection(&self, (conn, conn_returner): ReturnableConnection, conn_senders: &WritingSenders);
170}
171
172#[async_trait]
173impl<W: Writing> WritingInternal for W {
174 async fn write_to_stream<A: AsyncWrite + Unpin + Send>(
175 &self,
176 message: Self::Message,
177 writer: &mut FramedWrite<A, Self::Codec>,
178 ) -> Result<usize, <Self::Codec as Encoder<Self::Message>>::Error> {
179 writer.feed(message).await?;
180 let len = writer.write_buffer().len();
181 writer.flush().await?;
182
183 Ok(len)
184 }
185
186 async fn handle_new_connection(
187 &self,
188 (mut conn, conn_returner): ReturnableConnection,
189 conn_senders: &WritingSenders,
190 ) {
191 let addr = conn.addr();
192 let codec = self.codec(addr, !conn.side());
193 let writer = conn.writer.take().expect("missing connection writer!");
194 let mut framed = FramedWrite::new(writer, codec);
195
196 let (outbound_message_sender, mut outbound_message_receiver) = mpsc::channel(Self::MESSAGE_QUEUE_DEPTH);
197
198 conn_senders.write().insert(addr, outbound_message_sender);
200
201 let auto_cleanup = SenderCleanup { addr, senders: Arc::clone(conn_senders) };
203
204 let (tx_writer, rx_writer) = oneshot::channel();
206
207 let self_clone = self.clone();
209 let writer_task = tokio::spawn(async move {
210 let node = self_clone.tcp();
211 trace!(parent: node.span(), "spawned a task for writing messages to {}", addr);
212 tx_writer.send(()).unwrap(); let _auto_cleanup = auto_cleanup;
216
217 while let Some(wrapped_msg) = outbound_message_receiver.recv().await {
218 let msg = wrapped_msg.msg.downcast().unwrap();
219
220 match self_clone.write_to_stream(*msg, &mut framed).await {
221 Ok(len) => {
222 let _ = wrapped_msg.delivery_notification.send(Ok(()));
223 node.known_peers().register_sent_message(addr.ip(), len);
224 node.stats().register_sent_message(len);
225 trace!(parent: node.span(), "sent {}B to {}", len, addr);
226 }
227 Err(e) => {
228 node.known_peers().register_failure(addr.ip());
229 error!(parent: node.span(), "couldn't send a message to {}: {}", addr, e);
230 let is_fatal = node.config().fatal_io_errors.contains(&e.kind());
231 let _ = wrapped_msg.delivery_notification.send(Err(e));
232 if is_fatal {
233 break;
234 }
235 }
236 }
237 }
238
239 node.disconnect(addr).await;
240 });
241 let _ = rx_writer.await;
242 conn.tasks.push(writer_task);
243
244 if conn_returner.send(Ok(conn)).is_err() {
246 unreachable!("couldn't return a Connection to the Tcp");
247 }
248 }
249}
250
251struct WrappedMessage {
253 msg: Box<dyn Any + Send>,
254 delivery_notification: oneshot::Sender<io::Result<()>>,
255}
256
257impl WrappedMessage {
258 fn new(msg: Box<dyn Any + Send>) -> (Self, oneshot::Receiver<io::Result<()>>) {
259 let (tx, rx) = oneshot::channel();
260 let wrapped_msg = Self { msg, delivery_notification: tx };
261
262 (wrapped_msg, rx)
263 }
264}
265
266pub(crate) struct WritingHandler {
268 handler: ProtocolHandler<Connection, io::Result<Connection>>,
269 senders: WritingSenders,
270}
271
272impl Protocol<Connection, io::Result<Connection>> for WritingHandler {
273 fn trigger(&self, item: ReturnableConnection) {
274 self.handler.trigger(item);
275 }
276}
277
278struct SenderCleanup {
279 addr: SocketAddr,
280 senders: WritingSenders,
281}
282
283impl Drop for SenderCleanup {
284 fn drop(&mut self) {
285 self.senders.write().remove(&self.addr);
286 }
287}