1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
use klukai_types::{
actor::ClusterId,
broadcast::{BroadcastV1, ChangeSource, ChangeV1, UniPayload, UniPayloadV1},
channel::CorroSender,
tripwire::Tripwire,
};
use metrics::counter;
use speedy::Readable;
use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use tracing::{debug, error, trace};
/// Spawn a task that accepts unidirectional broadcast streams, then
/// spawns another task for each incoming stream to handle.
pub fn spawn_unipayload_handler(
tripwire: &Tripwire,
conn: &quinn::Connection,
cluster_id: ClusterId,
tx_changes: CorroSender<(ChangeV1, ChangeSource)>,
) {
tokio::spawn({
let conn = conn.clone();
let mut tripwire = tripwire.clone();
async move {
loop {
let rx = tokio::select! {
rx_res = conn.accept_uni() => match rx_res {
Ok(rx) => rx,
Err(e) => {
debug!("could not accept unidirectional stream from connection: {e}");
return;
}
},
_ = &mut tripwire => {
debug!("connection cancelled");
return;
}
};
counter!("corro.peer.stream.accept.total", "type" => "uni").increment(1);
trace!(
"accepted a unidirectional stream from {}",
conn.remote_address()
);
tokio::spawn({
let tx_changes = tx_changes.clone();
async move {
let mut framed = FramedRead::new(
rx,
LengthDelimitedCodec::builder()
.max_frame_length(100 * 1_024 * 1_024)
.new_codec(),
);
let mut changes = vec![];
loop {
match StreamExt::next(&mut framed).await {
Some(Ok(b)) => {
counter!("corro.peer.stream.bytes.recv.total", "type" => "uni")
.increment(b.len() as u64);
match UniPayload::read_from_buffer(&b) {
Ok(payload) => {
trace!("parsed a payload: {payload:?}");
match payload {
UniPayload::V1 {
data:
UniPayloadV1::Broadcast(BroadcastV1::Change(
change,
)),
cluster_id: payload_cluster_id,
} => {
if cluster_id != payload_cluster_id {
continue;
}
changes.push((change, ChangeSource::Broadcast));
}
}
}
Err(e) => {
error!("could not decode UniPayload: {e}");
continue;
}
}
}
Some(Err(e)) => {
error!("decode error: {e}");
}
None => break,
}
}
for change in changes.into_iter().rev() {
if let Err(e) = tx_changes.send(change).await {
error!("could not send change for processing: {e}");
return;
}
}
}
});
}
}
});
}