1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use std::sync::Arc;
use async_channel::{Receiver, Sender};
use stratum_apps::{
fallback_coordinator::FallbackCoordinator,
network_helpers::noise_stream::{NoiseTcpReadHalf, NoiseTcpWriteHalf},
stratum_core::framing_sv2::framing::Frame,
task_manager::TaskManager,
utils::types::{Message, Sv2Frame},
};
use tokio_util::sync::CancellationToken;
use tracing::{error, trace, warn, Instrument as _};
#[cfg_attr(not(test), hotpath::measure)]
#[track_caller]
#[allow(clippy::too_many_arguments)]
pub fn spawn_io_tasks(
task_manager: Arc<TaskManager>,
mut reader: NoiseTcpReadHalf<Message>,
mut writer: NoiseTcpWriteHalf<Message>,
outbound_rx: Receiver<Sv2Frame>,
inbound_tx: Sender<Sv2Frame>,
cancellation_token: CancellationToken,
fallback_coordinator: FallbackCoordinator,
) {
let caller = std::panic::Location::caller();
let inbound_tx_clone = inbound_tx.clone();
let outbound_rx_clone = outbound_rx.clone();
{
let cancellation_token_clone = cancellation_token.clone();
let fallback_coordinator_clone = fallback_coordinator.clone();
task_manager.spawn(
async move {
// we just spawned a new task that's relevant to fallback coordination
// so register it with the fallback coordinator
let fallback_handler = fallback_coordinator_clone.register();
// get the cancellation token that signals fallback
let fallback_token = fallback_coordinator_clone.token();
trace!("Reader task started");
loop {
tokio::select! {
_ = cancellation_token_clone.cancelled() => {
trace!("Received app shutdown signal");
inbound_tx.close();
break;
}
_ = fallback_token.cancelled() => {
trace!("Received fallback signal");
inbound_tx.close();
break;
}
res = reader.read_frame() => {
match res {
Ok(frame) => {
match frame {
Frame::HandShake(frame) => {
error!(?frame, "Received handshake frame");
drop(frame);
break;
},
Frame::Sv2(sv2_frame) => {
trace!("Received inbound frame");
if let Err(e) = inbound_tx.send(sv2_frame).await {
inbound_tx.close();
error!(error=?e, "Failed to forward inbound frame");
break;
}
},
}
}
Err(e) => {
error!(error=?e, "Reader error");
inbound_tx.close();
break;
}
}
}
}
}
inbound_tx.close();
outbound_rx_clone.close();
drop(inbound_tx);
drop(outbound_rx_clone);
// signal fallback coordinator that this task has completed its cleanup
fallback_handler.done();
warn!("Reader task exited.");
}
.instrument(tracing::trace_span!(
"reader_task",
spawned_at = %format!("{}:{}", caller.file(), caller.line())
)),
);
}
{
let fallback_coordinator_clone = fallback_coordinator.clone();
task_manager.spawn(
async move {
// we just spawned a new task that's relevant to fallback coordination
// so register it with the fallback coordinator
let fallback_handler = fallback_coordinator_clone.register();
// get the cancellation token that signals fallback
let fallback_token = fallback_coordinator_clone.token();
trace!("Writer task started");
loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
trace!("Received app shutdown signal");
inbound_tx_clone.close();
break;
}
_ = fallback_token.cancelled() => {
trace!("Received fallback signal");
inbound_tx_clone.close();
break;
}
res = outbound_rx.recv() => {
match res {
Ok(frame) => {
trace!("Sending outbound frame");
if let Err(e) = writer.write_frame(frame.into()).await {
error!(error=?e, "Writer error");
outbound_rx.close();
break;
}
}
Err(_) => {
outbound_rx.close();
warn!("Outbound channel closed");
break;
}
}
}
}
}
outbound_rx.close();
inbound_tx_clone.close();
drop(outbound_rx);
drop(inbound_tx_clone);
// signal fallback coordinator that this task has completed its cleanup
fallback_handler.done();
warn!("Writer task exited.");
}
.instrument(tracing::trace_span!(
"writer_task",
spawned_at = %format!("{}:{}", caller.file(), caller.line())
)),
);
}
}