1use crate::ConnectionCloseReason;
2
3const ACK_PAYLOAD_MAGIC: &[u8; 8] = b"ANQAckP1";
4const ACK_CONTROL_MAGIC: &[u8; 8] = b"ANQAckC1";
5const PROBE_REQUEST_MAGIC: &[u8; 8] = b"ANQProR1";
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
9pub enum ReceiveRejectReason {
10 ConsumerGone,
12 InvalidEnvelope,
14 NotSupported,
16 Unknown,
18}
19
20impl std::fmt::Display for ReceiveRejectReason {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 let value = match self {
23 Self::ConsumerGone => "ConsumerGone",
24 Self::InvalidEnvelope => "InvalidEnvelope",
25 Self::NotSupported => "NotSupported",
26 Self::Unknown => "Unknown",
27 };
28 f.write_str(value)
29 }
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub(crate) enum AckControlOutcome {
34 Accepted,
35 Rejected(ReceiveRejectReason),
36 Closed(ConnectionCloseReason),
37}
38
39pub(crate) fn encode_ack_payload(tag: [u8; 16], payload: &[u8]) -> Vec<u8> {
40 let mut bytes = Vec::with_capacity(ACK_PAYLOAD_MAGIC.len() + tag.len() + payload.len());
41 bytes.extend_from_slice(ACK_PAYLOAD_MAGIC);
42 bytes.extend_from_slice(&tag);
43 bytes.extend_from_slice(payload);
44 bytes
45}
46
47pub(crate) fn decode_ack_payload(bytes: &[u8]) -> Option<([u8; 16], &[u8])> {
48 if bytes.len() < ACK_PAYLOAD_MAGIC.len() + 16 || !bytes.starts_with(ACK_PAYLOAD_MAGIC) {
49 return None;
50 }
51
52 let mut tag = [0u8; 16];
53 tag.copy_from_slice(&bytes[ACK_PAYLOAD_MAGIC.len()..ACK_PAYLOAD_MAGIC.len() + 16]);
54 Some((tag, &bytes[ACK_PAYLOAD_MAGIC.len() + 16..]))
55}
56
57pub(crate) fn encode_ack_control(tag: [u8; 16], outcome: AckControlOutcome) -> Vec<u8> {
58 let mut bytes = Vec::with_capacity(ACK_CONTROL_MAGIC.len() + 18);
59 bytes.extend_from_slice(ACK_CONTROL_MAGIC);
60 bytes.extend_from_slice(&tag);
61 match outcome {
62 AckControlOutcome::Accepted => {
63 bytes.push(0);
64 bytes.push(0);
65 }
66 AckControlOutcome::Rejected(reason) => {
67 bytes.push(1);
68 bytes.push(match reason {
69 ReceiveRejectReason::ConsumerGone => 1,
70 ReceiveRejectReason::InvalidEnvelope => 2,
71 ReceiveRejectReason::NotSupported => 3,
72 ReceiveRejectReason::Unknown => 255,
73 });
74 }
75 AckControlOutcome::Closed(reason) => {
76 bytes.push(2);
77 bytes.push(match reason {
78 ConnectionCloseReason::Superseded => 1,
79 ConnectionCloseReason::ReaderExit => 2,
80 ConnectionCloseReason::PeerShutdown => 3,
81 ConnectionCloseReason::Banned => 4,
82 ConnectionCloseReason::LifecycleCleanup => 5,
83 ConnectionCloseReason::ApplicationClosed => 6,
84 ConnectionCloseReason::ConnectionClosed => 7,
85 ConnectionCloseReason::TimedOut => 8,
86 ConnectionCloseReason::Reset => 9,
87 ConnectionCloseReason::TransportError => 10,
88 ConnectionCloseReason::LocallyClosed => 11,
89 ConnectionCloseReason::VersionMismatch => 12,
90 ConnectionCloseReason::CidsExhausted => 13,
91 ConnectionCloseReason::Unknown => 255,
92 });
93 }
94 }
95 bytes
96}
97
98pub(crate) fn decode_ack_control(bytes: &[u8]) -> Option<([u8; 16], AckControlOutcome)> {
99 if bytes.len() != ACK_CONTROL_MAGIC.len() + 18 || !bytes.starts_with(ACK_CONTROL_MAGIC) {
100 return None;
101 }
102
103 let mut tag = [0u8; 16];
104 tag.copy_from_slice(&bytes[ACK_CONTROL_MAGIC.len()..ACK_CONTROL_MAGIC.len() + 16]);
105 let kind = bytes[ACK_CONTROL_MAGIC.len() + 16];
106 let value = bytes[ACK_CONTROL_MAGIC.len() + 17];
107 let outcome = match kind {
108 0 => AckControlOutcome::Accepted,
109 1 => AckControlOutcome::Rejected(match value {
110 1 => ReceiveRejectReason::ConsumerGone,
111 2 => ReceiveRejectReason::InvalidEnvelope,
112 3 => ReceiveRejectReason::NotSupported,
113 _ => ReceiveRejectReason::Unknown,
114 }),
115 2 => AckControlOutcome::Closed(match value {
116 1 => ConnectionCloseReason::Superseded,
117 2 => ConnectionCloseReason::ReaderExit,
118 3 => ConnectionCloseReason::PeerShutdown,
119 4 => ConnectionCloseReason::Banned,
120 5 => ConnectionCloseReason::LifecycleCleanup,
121 6 => ConnectionCloseReason::ApplicationClosed,
122 7 => ConnectionCloseReason::ConnectionClosed,
123 8 => ConnectionCloseReason::TimedOut,
124 9 => ConnectionCloseReason::Reset,
125 10 => ConnectionCloseReason::TransportError,
126 11 => ConnectionCloseReason::LocallyClosed,
127 12 => ConnectionCloseReason::VersionMismatch,
128 13 => ConnectionCloseReason::CidsExhausted,
129 _ => ConnectionCloseReason::Unknown,
130 }),
131 _ => return None,
132 };
133 Some((tag, outcome))
134}
135
136pub(crate) fn encode_probe_request(tag: [u8; 16]) -> Vec<u8> {
142 let mut bytes = Vec::with_capacity(PROBE_REQUEST_MAGIC.len() + tag.len());
143 bytes.extend_from_slice(PROBE_REQUEST_MAGIC);
144 bytes.extend_from_slice(&tag);
145 bytes
146}
147
148pub(crate) fn decode_probe_request(bytes: &[u8]) -> Option<[u8; 16]> {
154 if bytes.len() != PROBE_REQUEST_MAGIC.len() + 16 || !bytes.starts_with(PROBE_REQUEST_MAGIC) {
155 return None;
156 }
157 let mut tag = [0u8; 16];
158 tag.copy_from_slice(&bytes[PROBE_REQUEST_MAGIC.len()..PROBE_REQUEST_MAGIC.len() + 16]);
159 Some(tag)
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165
166 #[test]
167 fn ack_payload_roundtrip() {
168 let tag = [0xAB; 16];
169 let payload = b"hello";
170 let encoded = encode_ack_payload(tag, payload);
171 let (decoded_tag, decoded_payload) = decode_ack_payload(&encoded).expect("decode payload");
172 assert_eq!(decoded_tag, tag);
173 assert_eq!(decoded_payload, payload);
174 }
175
176 #[test]
177 fn ack_control_roundtrip() {
178 let tag = [0xCD; 16];
179 let encoded = encode_ack_control(
180 tag,
181 AckControlOutcome::Rejected(ReceiveRejectReason::ConsumerGone),
182 );
183 let (decoded_tag, outcome) = decode_ack_control(&encoded).expect("decode control");
184 assert_eq!(decoded_tag, tag);
185 assert_eq!(
186 outcome,
187 AckControlOutcome::Rejected(ReceiveRejectReason::ConsumerGone)
188 );
189 }
190
191 #[test]
192 fn probe_request_roundtrip() {
193 let tag = [0x5A; 16];
194 let encoded = encode_probe_request(tag);
195 let decoded = decode_probe_request(&encoded).expect("decode probe");
196 assert_eq!(decoded, tag);
197 }
198
199 #[test]
200 fn probe_envelope_distinct_from_ack_envelopes() {
201 let tag = [0x77; 16];
202 let probe = encode_probe_request(tag);
203 assert!(
204 decode_ack_payload(&probe).is_none(),
205 "probe envelope must not decode as ACK payload"
206 );
207 assert!(
208 decode_ack_control(&probe).is_none(),
209 "probe envelope must not decode as ACK control frame"
210 );
211
212 let ack_payload = encode_ack_payload(tag, b"hi");
213 assert!(
214 decode_probe_request(&ack_payload).is_none(),
215 "ACK payload must not decode as probe envelope"
216 );
217
218 let ack_control = encode_ack_control(tag, AckControlOutcome::Accepted);
219 assert!(
220 decode_probe_request(&ack_control).is_none(),
221 "ACK control frame must not decode as probe envelope"
222 );
223 }
224
225 #[test]
226 fn probe_request_rejects_wrong_length() {
227 let mut too_short = PROBE_REQUEST_MAGIC.to_vec();
228 too_short.extend_from_slice(&[0u8; 15]);
229 assert!(decode_probe_request(&too_short).is_none());
230
231 let mut too_long = PROBE_REQUEST_MAGIC.to_vec();
232 too_long.extend_from_slice(&[0u8; 17]);
233 assert!(decode_probe_request(&too_long).is_none());
234 }
235}