1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use std::sync::Arc;
use async_channel::{Receiver, Sender};
use bitcoin_core_sv2::template_distribution_protocol::CancellationToken;
use stratum_apps::{
network_helpers::noise_stream::{NoiseTcpReadHalf, NoiseTcpWriteHalf},
stratum_core::framing_sv2::framing::Frame,
task_manager::TaskManager,
utils::types::{Message, Sv2Frame},
};
use tracing::{error, trace, warn, Instrument as _};
/// Spawns async reader and writer tasks for handling framed I/O with shutdown support.
#[track_caller]
#[cfg_attr(not(test), hotpath::measure)]
pub fn spawn_io_tasks(
task_manager: Arc<TaskManager>,
mut reader: NoiseTcpReadHalf<Message>,
mut writer: NoiseTcpWriteHalf<Message>,
outbound_rx: Receiver<Sv2Frame>,
inbound_tx: Sender<Sv2Frame>,
cancellation_token: CancellationToken,
) {
let caller = std::panic::Location::caller();
let inbound_tx_clone = inbound_tx.clone();
let outbound_rx_clone = outbound_rx.clone();
{
let cancellation_token = cancellation_token.clone();
task_manager.spawn(
async move {
trace!("Reader task started");
loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
trace!("Received shutdown");
inbound_tx.close();
break;
}
res = reader.read_frame() => {
match res {
Ok(frame) => {
match frame {
Frame::HandShake(frame) => {
error!(?frame, "Received handshake frame");
drop(frame);
break;
},
Frame::Sv2(sv2_frame) => {
trace!("Received inbound frame");
if let Err(e) = inbound_tx.send(sv2_frame).await {
inbound_tx.close();
error!(error=?e, "Failed to forward inbound frame");
break;
}
},
}
}
Err(e) => {
error!(error=?e, "Reader error");
inbound_tx.close();
break;
}
}
}
}
}
inbound_tx.close();
outbound_rx_clone.close();
drop(inbound_tx);
drop(outbound_rx_clone);
warn!("Reader task exited.");
}
.instrument(tracing::trace_span!(
"reader_task",
spawned_at = %format!("{}:{}", caller.file(), caller.line())
)),
);
}
{
let cancellation_token = cancellation_token.clone();
task_manager.spawn(
async move {
trace!("Writer task started");
loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
trace!("Received shutdown");
outbound_rx.close();
break;
}
res = outbound_rx.recv() => {
match res {
Ok(frame) => {
trace!("Sending outbound frame");
if let Err(e) = writer.write_frame(frame.into()).await {
error!(error=?e, "Writer error");
outbound_rx.close();
break;
}
}
Err(_) => {
outbound_rx.close();
warn!("Outbound channel closed");
break;
}
}
}
}
}
outbound_rx.close();
inbound_tx_clone.close();
drop(outbound_rx);
drop(inbound_tx_clone);
warn!("Writer task exited.");
}
.instrument(tracing::trace_span!(
"writer_task",
spawned_at = %format!("{}:{}", caller.file(), caller.line())
)),
);
}
}