Skip to main content

ant_quic/
ack_frame.rs

1use crate::ConnectionCloseReason;
2
3const ACK_PAYLOAD_MAGIC: &[u8; 8] = b"ANQAckP1";
4const ACK_CONTROL_MAGIC: &[u8; 8] = b"ANQAckC1";
5const PROBE_REQUEST_MAGIC: &[u8; 8] = b"ANQProR1";
6
7/// Reasons the remote receive pipeline rejected an ACK-requested payload.
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
9pub enum ReceiveRejectReason {
10    /// The local consumer side of `recv()` is no longer available.
11    ConsumerGone,
12    /// The payload format was invalid for ACK-v1.
13    InvalidEnvelope,
14    /// The request is not supported on this connection.
15    NotSupported,
16    /// The payload was rejected for an unspecified reason.
17    Unknown,
18}
19
20impl std::fmt::Display for ReceiveRejectReason {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        let value = match self {
23            Self::ConsumerGone => "ConsumerGone",
24            Self::InvalidEnvelope => "InvalidEnvelope",
25            Self::NotSupported => "NotSupported",
26            Self::Unknown => "Unknown",
27        };
28        f.write_str(value)
29    }
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub(crate) enum AckControlOutcome {
34    Accepted,
35    Rejected(ReceiveRejectReason),
36    Closed(ConnectionCloseReason),
37}
38
39pub(crate) fn encode_ack_payload(tag: [u8; 16], payload: &[u8]) -> Vec<u8> {
40    let mut bytes = Vec::with_capacity(ACK_PAYLOAD_MAGIC.len() + tag.len() + payload.len());
41    bytes.extend_from_slice(ACK_PAYLOAD_MAGIC);
42    bytes.extend_from_slice(&tag);
43    bytes.extend_from_slice(payload);
44    bytes
45}
46
47pub(crate) fn decode_ack_payload(bytes: &[u8]) -> Option<([u8; 16], &[u8])> {
48    if bytes.len() < ACK_PAYLOAD_MAGIC.len() + 16 || !bytes.starts_with(ACK_PAYLOAD_MAGIC) {
49        return None;
50    }
51
52    let mut tag = [0u8; 16];
53    tag.copy_from_slice(&bytes[ACK_PAYLOAD_MAGIC.len()..ACK_PAYLOAD_MAGIC.len() + 16]);
54    Some((tag, &bytes[ACK_PAYLOAD_MAGIC.len() + 16..]))
55}
56
57pub(crate) fn encode_ack_control(tag: [u8; 16], outcome: AckControlOutcome) -> Vec<u8> {
58    let mut bytes = Vec::with_capacity(ACK_CONTROL_MAGIC.len() + 18);
59    bytes.extend_from_slice(ACK_CONTROL_MAGIC);
60    bytes.extend_from_slice(&tag);
61    match outcome {
62        AckControlOutcome::Accepted => {
63            bytes.push(0);
64            bytes.push(0);
65        }
66        AckControlOutcome::Rejected(reason) => {
67            bytes.push(1);
68            bytes.push(match reason {
69                ReceiveRejectReason::ConsumerGone => 1,
70                ReceiveRejectReason::InvalidEnvelope => 2,
71                ReceiveRejectReason::NotSupported => 3,
72                ReceiveRejectReason::Unknown => 255,
73            });
74        }
75        AckControlOutcome::Closed(reason) => {
76            bytes.push(2);
77            bytes.push(match reason {
78                ConnectionCloseReason::Superseded => 1,
79                ConnectionCloseReason::ReaderExit => 2,
80                ConnectionCloseReason::PeerShutdown => 3,
81                ConnectionCloseReason::Banned => 4,
82                ConnectionCloseReason::LifecycleCleanup => 5,
83                ConnectionCloseReason::ApplicationClosed => 6,
84                ConnectionCloseReason::ConnectionClosed => 7,
85                ConnectionCloseReason::TimedOut => 8,
86                ConnectionCloseReason::Reset => 9,
87                ConnectionCloseReason::TransportError => 10,
88                ConnectionCloseReason::LocallyClosed => 11,
89                ConnectionCloseReason::VersionMismatch => 12,
90                ConnectionCloseReason::CidsExhausted => 13,
91                ConnectionCloseReason::Unknown => 255,
92            });
93        }
94    }
95    bytes
96}
97
98pub(crate) fn decode_ack_control(bytes: &[u8]) -> Option<([u8; 16], AckControlOutcome)> {
99    if bytes.len() != ACK_CONTROL_MAGIC.len() + 18 || !bytes.starts_with(ACK_CONTROL_MAGIC) {
100        return None;
101    }
102
103    let mut tag = [0u8; 16];
104    tag.copy_from_slice(&bytes[ACK_CONTROL_MAGIC.len()..ACK_CONTROL_MAGIC.len() + 16]);
105    let kind = bytes[ACK_CONTROL_MAGIC.len() + 16];
106    let value = bytes[ACK_CONTROL_MAGIC.len() + 17];
107    let outcome = match kind {
108        0 => AckControlOutcome::Accepted,
109        1 => AckControlOutcome::Rejected(match value {
110            1 => ReceiveRejectReason::ConsumerGone,
111            2 => ReceiveRejectReason::InvalidEnvelope,
112            3 => ReceiveRejectReason::NotSupported,
113            _ => ReceiveRejectReason::Unknown,
114        }),
115        2 => AckControlOutcome::Closed(match value {
116            1 => ConnectionCloseReason::Superseded,
117            2 => ConnectionCloseReason::ReaderExit,
118            3 => ConnectionCloseReason::PeerShutdown,
119            4 => ConnectionCloseReason::Banned,
120            5 => ConnectionCloseReason::LifecycleCleanup,
121            6 => ConnectionCloseReason::ApplicationClosed,
122            7 => ConnectionCloseReason::ConnectionClosed,
123            8 => ConnectionCloseReason::TimedOut,
124            9 => ConnectionCloseReason::Reset,
125            10 => ConnectionCloseReason::TransportError,
126            11 => ConnectionCloseReason::LocallyClosed,
127            12 => ConnectionCloseReason::VersionMismatch,
128            13 => ConnectionCloseReason::CidsExhausted,
129            _ => ConnectionCloseReason::Unknown,
130        }),
131        _ => return None,
132    };
133    Some((tag, outcome))
134}
135
136/// Encode a probe-liveness request envelope.
137///
138/// Carries only the 16-byte correlation tag — no user payload. Distinct magic
139/// from [`encode_ack_payload`] so the reader path can short-circuit probes
140/// without forwarding anything to the application receive channel.
141pub(crate) fn encode_probe_request(tag: [u8; 16]) -> Vec<u8> {
142    let mut bytes = Vec::with_capacity(PROBE_REQUEST_MAGIC.len() + tag.len());
143    bytes.extend_from_slice(PROBE_REQUEST_MAGIC);
144    bytes.extend_from_slice(&tag);
145    bytes
146}
147
148/// Decode a probe-liveness request envelope.
149///
150/// Returns `Some(tag)` when `bytes` is exactly a probe envelope. Probe responses
151/// are carried as ordinary [`AckControlOutcome::Accepted`] control frames so the
152/// existing waiter machinery resolves them.
153pub(crate) fn decode_probe_request(bytes: &[u8]) -> Option<[u8; 16]> {
154    if bytes.len() != PROBE_REQUEST_MAGIC.len() + 16 || !bytes.starts_with(PROBE_REQUEST_MAGIC) {
155        return None;
156    }
157    let mut tag = [0u8; 16];
158    tag.copy_from_slice(&bytes[PROBE_REQUEST_MAGIC.len()..PROBE_REQUEST_MAGIC.len() + 16]);
159    Some(tag)
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    #[test]
167    fn ack_payload_roundtrip() {
168        let tag = [0xAB; 16];
169        let payload = b"hello";
170        let encoded = encode_ack_payload(tag, payload);
171        let (decoded_tag, decoded_payload) = decode_ack_payload(&encoded).expect("decode payload");
172        assert_eq!(decoded_tag, tag);
173        assert_eq!(decoded_payload, payload);
174    }
175
176    #[test]
177    fn ack_control_roundtrip() {
178        let tag = [0xCD; 16];
179        let encoded = encode_ack_control(
180            tag,
181            AckControlOutcome::Rejected(ReceiveRejectReason::ConsumerGone),
182        );
183        let (decoded_tag, outcome) = decode_ack_control(&encoded).expect("decode control");
184        assert_eq!(decoded_tag, tag);
185        assert_eq!(
186            outcome,
187            AckControlOutcome::Rejected(ReceiveRejectReason::ConsumerGone)
188        );
189    }
190
191    #[test]
192    fn probe_request_roundtrip() {
193        let tag = [0x5A; 16];
194        let encoded = encode_probe_request(tag);
195        let decoded = decode_probe_request(&encoded).expect("decode probe");
196        assert_eq!(decoded, tag);
197    }
198
199    #[test]
200    fn probe_envelope_distinct_from_ack_envelopes() {
201        let tag = [0x77; 16];
202        let probe = encode_probe_request(tag);
203        assert!(
204            decode_ack_payload(&probe).is_none(),
205            "probe envelope must not decode as ACK payload"
206        );
207        assert!(
208            decode_ack_control(&probe).is_none(),
209            "probe envelope must not decode as ACK control frame"
210        );
211
212        let ack_payload = encode_ack_payload(tag, b"hi");
213        assert!(
214            decode_probe_request(&ack_payload).is_none(),
215            "ACK payload must not decode as probe envelope"
216        );
217
218        let ack_control = encode_ack_control(tag, AckControlOutcome::Accepted);
219        assert!(
220            decode_probe_request(&ack_control).is_none(),
221            "ACK control frame must not decode as probe envelope"
222        );
223    }
224
225    #[test]
226    fn probe_request_rejects_wrong_length() {
227        let mut too_short = PROBE_REQUEST_MAGIC.to_vec();
228        too_short.extend_from_slice(&[0u8; 15]);
229        assert!(decode_probe_request(&too_short).is_none());
230
231        let mut too_long = PROBE_REQUEST_MAGIC.to_vec();
232        too_long.extend_from_slice(&[0u8; 17]);
233        assert!(decode_probe_request(&too_long).is_none());
234    }
235}