1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use crate::{connections::ConnectionWriter, protocols::ReturnableConnection, Pea2Pea};
use async_trait::async_trait;
use bytes::Bytes;
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::*;
use std::io;
#[async_trait]
pub trait Writing: Pea2Pea
where
Self: Clone + Send + Sync + 'static,
{
fn enable_writing(&self) {
let (conn_sender, mut conn_receiver) =
mpsc::channel::<ReturnableConnection>(self.node().config.writing_handler_queue_depth);
let self_clone = self.clone();
let writing_task = tokio::spawn(async move {
trace!(parent: self_clone.node().span(), "spawned the `Writing` handler task");
loop {
if let Some((mut conn, conn_returner)) = conn_receiver.recv().await {
let addr = conn.addr;
let mut conn_writer = conn.writer.take().unwrap();
let (outbound_message_sender, mut outbound_message_receiver) =
mpsc::channel::<Bytes>(self_clone.node().config.conn_outbound_queue_depth);
let writer_clone = self_clone.clone();
let writer_task = tokio::spawn(async move {
let node = writer_clone.node();
trace!(parent: node.span(), "spawned a task for writing messages to {}", addr);
loop {
while let Some(msg) = outbound_message_receiver.recv().await {
if let Err(e) =
writer_clone.write_message(&mut conn_writer, &msg).await
{
node.known_peers().register_failure(addr);
error!(parent: node.span(), "couldn't send {}B to {}: {}", msg.len(), addr, e);
} else {
node.known_peers().register_sent_message(addr, msg.len());
node.stats.register_sent_message(msg.len());
trace!(parent: node.span(), "sent {}B to {}", msg.len(), addr);
}
}
}
});
conn.tasks.push(writer_task);
conn.outbound_message_sender = Some(outbound_message_sender);
if conn_returner.send(Ok(conn)).is_err() {
panic!("can't return a Connection to the Node");
}
}
}
});
self.node()
.set_writing_handler((conn_sender, writing_task).into());
}
async fn write_message(&self, writer: &mut ConnectionWriter, payload: &[u8]) -> io::Result<()>;
}
pub struct WritingHandler {
sender: mpsc::Sender<ReturnableConnection>,
_task: JoinHandle<()>,
}
impl WritingHandler {
pub async fn send(&self, returnable_conn: ReturnableConnection) {
if self.sender.send(returnable_conn).await.is_err() {
panic!("WritingHandler's Receiver is closed")
}
}
}
impl From<(mpsc::Sender<ReturnableConnection>, JoinHandle<()>)> for WritingHandler {
fn from((sender, _task): (mpsc::Sender<ReturnableConnection>, JoinHandle<()>)) -> Self {
Self { sender, _task }
}
}