Skip to main content

nnrp_core/
session.rs

1use crate::{
2    InFlightPolicy, NnrpError, SessionCloseReason, SessionCloseStatus, SessionPriorityClass,
3    SessionStatus,
4};
5
6pub const SESSION_OPEN_METADATA_LEN: usize = 48;
7pub const SESSION_OPEN_ACK_METADATA_LEN: usize = 56;
8pub const SESSION_CLOSE_METADATA_LEN: usize = 24;
9pub const SESSION_CLOSE_ACK_METADATA_LEN: usize = 16;
10
11pub const SESSION_FLAGS_KNOWN_MASK: u8 = 0x0f;
12pub const SESSION_FLAGS_ACK_KNOWN_MASK: u32 = 0x0000_001f;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub struct SessionOpenMetadata {
16    pub requested_session_id: u32,
17    pub profile_id: u16,
18    pub priority_class: SessionPriorityClass,
19    pub session_flags: u8,
20    pub schema_id: u32,
21    pub schema_version: u32,
22    pub default_deadline_ms: u32,
23    pub max_in_flight_operations: u16,
24    pub lease_ttl_hint_ms: u32,
25    pub resume_token_bytes: u32,
26    pub auth_bytes: u32,
27    pub session_extension_bytes: u32,
28    pub client_session_tag: u64,
29}
30
31impl SessionOpenMetadata {
32    pub fn parse(source: &[u8]) -> Result<Self, NnrpError> {
33        require_len(source, SESSION_OPEN_METADATA_LEN)?;
34        let reserved0 = read_u16(source, 22);
35        validate_zero_u16("session_open.reserved0", reserved0)?;
36
37        let session_flags = source[7];
38        validate_mask_u8(session_flags, SESSION_FLAGS_KNOWN_MASK)?;
39
40        Ok(Self {
41            requested_session_id: read_u32(source, 0),
42            profile_id: read_u16(source, 4),
43            priority_class: SessionPriorityClass::try_from_u8(source[6])?,
44            session_flags,
45            schema_id: read_u32(source, 8),
46            schema_version: read_u32(source, 12),
47            default_deadline_ms: read_u32(source, 16),
48            max_in_flight_operations: read_u16(source, 20),
49            lease_ttl_hint_ms: read_u32(source, 24),
50            resume_token_bytes: read_u32(source, 28),
51            auth_bytes: read_u32(source, 32),
52            session_extension_bytes: read_u32(source, 36),
53            client_session_tag: read_u64(source, 40),
54        })
55    }
56
57    pub fn write(&self, destination: &mut [u8]) -> Result<(), NnrpError> {
58        require_destination_len(destination, SESSION_OPEN_METADATA_LEN)?;
59        validate_mask_u8(self.session_flags, SESSION_FLAGS_KNOWN_MASK)?;
60
61        destination[..SESSION_OPEN_METADATA_LEN].fill(0);
62        write_u32(destination, 0, self.requested_session_id);
63        write_u16(destination, 4, self.profile_id);
64        destination[6] = self.priority_class as u8;
65        destination[7] = self.session_flags;
66        write_u32(destination, 8, self.schema_id);
67        write_u32(destination, 12, self.schema_version);
68        write_u32(destination, 16, self.default_deadline_ms);
69        write_u16(destination, 20, self.max_in_flight_operations);
70        write_u32(destination, 24, self.lease_ttl_hint_ms);
71        write_u32(destination, 28, self.resume_token_bytes);
72        write_u32(destination, 32, self.auth_bytes);
73        write_u32(destination, 36, self.session_extension_bytes);
74        write_u64(destination, 40, self.client_session_tag);
75        Ok(())
76    }
77
78    pub fn to_bytes(&self) -> Result<[u8; SESSION_OPEN_METADATA_LEN], NnrpError> {
79        let mut bytes = [0u8; SESSION_OPEN_METADATA_LEN];
80        self.write(&mut bytes)?;
81        Ok(bytes)
82    }
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub struct SessionOpenAckMetadata {
87    pub session_id: u32,
88    pub accepted_profile_id: u16,
89    pub accepted_priority_class: SessionPriorityClass,
90    pub session_status: SessionStatus,
91    pub schema_id: u32,
92    pub schema_version: u32,
93    pub granted_operation_credit: u16,
94    pub max_in_flight_operations: u16,
95    pub lease_ttl_ms: u32,
96    pub resume_window_ms: u32,
97    pub resume_token_bytes: u32,
98    pub session_extension_bytes: u32,
99    pub server_session_tag: u64,
100    pub route_scope_id: u32,
101    pub session_error_code: u32,
102    pub session_flags_ack: u32,
103}
104
105impl SessionOpenAckMetadata {
106    pub fn parse(source: &[u8]) -> Result<Self, NnrpError> {
107        require_len(source, SESSION_OPEN_ACK_METADATA_LEN)?;
108        let session_flags_ack = read_u32(source, 52);
109        validate_mask_u32(session_flags_ack, SESSION_FLAGS_ACK_KNOWN_MASK)?;
110
111        Ok(Self {
112            session_id: read_u32(source, 0),
113            accepted_profile_id: read_u16(source, 4),
114            accepted_priority_class: SessionPriorityClass::try_from_u8(source[6])?,
115            session_status: SessionStatus::try_from_u8(source[7])?,
116            schema_id: read_u32(source, 8),
117            schema_version: read_u32(source, 12),
118            granted_operation_credit: read_u16(source, 16),
119            max_in_flight_operations: read_u16(source, 18),
120            lease_ttl_ms: read_u32(source, 20),
121            resume_window_ms: read_u32(source, 24),
122            resume_token_bytes: read_u32(source, 28),
123            session_extension_bytes: read_u32(source, 32),
124            server_session_tag: read_u64(source, 36),
125            route_scope_id: read_u32(source, 44),
126            session_error_code: read_u32(source, 48),
127            session_flags_ack,
128        })
129    }
130
131    pub fn write(&self, destination: &mut [u8]) -> Result<(), NnrpError> {
132        require_destination_len(destination, SESSION_OPEN_ACK_METADATA_LEN)?;
133        validate_mask_u32(self.session_flags_ack, SESSION_FLAGS_ACK_KNOWN_MASK)?;
134
135        destination[..SESSION_OPEN_ACK_METADATA_LEN].fill(0);
136        write_u32(destination, 0, self.session_id);
137        write_u16(destination, 4, self.accepted_profile_id);
138        destination[6] = self.accepted_priority_class as u8;
139        destination[7] = self.session_status as u8;
140        write_u32(destination, 8, self.schema_id);
141        write_u32(destination, 12, self.schema_version);
142        write_u16(destination, 16, self.granted_operation_credit);
143        write_u16(destination, 18, self.max_in_flight_operations);
144        write_u32(destination, 20, self.lease_ttl_ms);
145        write_u32(destination, 24, self.resume_window_ms);
146        write_u32(destination, 28, self.resume_token_bytes);
147        write_u32(destination, 32, self.session_extension_bytes);
148        write_u64(destination, 36, self.server_session_tag);
149        write_u32(destination, 44, self.route_scope_id);
150        write_u32(destination, 48, self.session_error_code);
151        write_u32(destination, 52, self.session_flags_ack);
152        Ok(())
153    }
154
155    pub fn to_bytes(&self) -> Result<[u8; SESSION_OPEN_ACK_METADATA_LEN], NnrpError> {
156        let mut bytes = [0u8; SESSION_OPEN_ACK_METADATA_LEN];
157        self.write(&mut bytes)?;
158        Ok(bytes)
159    }
160}
161
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163pub struct SessionCloseMetadata {
164    pub close_reason: SessionCloseReason,
165    pub in_flight_policy: InFlightPolicy,
166    pub drain_timeout_ms: u32,
167    pub last_operation_id: u64,
168    pub session_error_code: u32,
169    pub session_close_tag: u32,
170}
171
172impl SessionCloseMetadata {
173    pub fn parse(source: &[u8]) -> Result<Self, NnrpError> {
174        require_len(source, SESSION_CLOSE_METADATA_LEN)?;
175        validate_zero_u8("session_close.reserved0", source[3])?;
176
177        Ok(Self {
178            close_reason: SessionCloseReason::try_from_u16(read_u16(source, 0))?,
179            in_flight_policy: InFlightPolicy::try_from_u8(source[2])?,
180            drain_timeout_ms: read_u32(source, 4),
181            last_operation_id: read_u64(source, 8),
182            session_error_code: read_u32(source, 16),
183            session_close_tag: read_u32(source, 20),
184        })
185    }
186
187    pub fn write(&self, destination: &mut [u8]) -> Result<(), NnrpError> {
188        require_destination_len(destination, SESSION_CLOSE_METADATA_LEN)?;
189
190        destination[..SESSION_CLOSE_METADATA_LEN].fill(0);
191        write_u16(destination, 0, self.close_reason as u16);
192        destination[2] = self.in_flight_policy as u8;
193        write_u32(destination, 4, self.drain_timeout_ms);
194        write_u64(destination, 8, self.last_operation_id);
195        write_u32(destination, 16, self.session_error_code);
196        write_u32(destination, 20, self.session_close_tag);
197        Ok(())
198    }
199
200    pub fn to_bytes(&self) -> Result<[u8; SESSION_CLOSE_METADATA_LEN], NnrpError> {
201        let mut bytes = [0u8; SESSION_CLOSE_METADATA_LEN];
202        self.write(&mut bytes)?;
203        Ok(bytes)
204    }
205}
206
207#[derive(Debug, Clone, Copy, PartialEq, Eq)]
208pub struct SessionCloseAckMetadata {
209    pub close_status: SessionCloseStatus,
210    pub last_operation_id: u64,
211    pub session_error_code: u32,
212}
213
214impl SessionCloseAckMetadata {
215    pub fn parse(source: &[u8]) -> Result<Self, NnrpError> {
216        require_len(source, SESSION_CLOSE_ACK_METADATA_LEN)?;
217        validate_zero_u8("session_close_ack.reserved0", source[1])?;
218        validate_zero_u16("session_close_ack.reserved1", read_u16(source, 2))?;
219
220        Ok(Self {
221            close_status: SessionCloseStatus::try_from_u8(source[0])?,
222            last_operation_id: read_u64(source, 4),
223            session_error_code: read_u32(source, 12),
224        })
225    }
226
227    pub fn write(&self, destination: &mut [u8]) -> Result<(), NnrpError> {
228        require_destination_len(destination, SESSION_CLOSE_ACK_METADATA_LEN)?;
229
230        destination[..SESSION_CLOSE_ACK_METADATA_LEN].fill(0);
231        destination[0] = self.close_status as u8;
232        write_u64(destination, 4, self.last_operation_id);
233        write_u32(destination, 12, self.session_error_code);
234        Ok(())
235    }
236
237    pub fn to_bytes(&self) -> Result<[u8; SESSION_CLOSE_ACK_METADATA_LEN], NnrpError> {
238        let mut bytes = [0u8; SESSION_CLOSE_ACK_METADATA_LEN];
239        self.write(&mut bytes)?;
240        Ok(bytes)
241    }
242}
243
244fn require_len(source: &[u8], expected: usize) -> Result<(), NnrpError> {
245    if source.len() < expected {
246        return Err(NnrpError::SourceTooShort {
247            expected,
248            actual: source.len(),
249        });
250    }
251
252    Ok(())
253}
254
255fn require_destination_len(destination: &[u8], expected: usize) -> Result<(), NnrpError> {
256    if destination.len() < expected {
257        return Err(NnrpError::DestinationTooShort {
258            expected,
259            actual: destination.len(),
260        });
261    }
262
263    Ok(())
264}
265
266fn validate_zero_u8(field: &'static str, value: u8) -> Result<(), NnrpError> {
267    if value != 0 {
268        return Err(NnrpError::NonZeroReservedField { field });
269    }
270
271    Ok(())
272}
273
274fn validate_zero_u16(field: &'static str, value: u16) -> Result<(), NnrpError> {
275    if value != 0 {
276        return Err(NnrpError::NonZeroReservedField { field });
277    }
278
279    Ok(())
280}
281
282fn validate_mask_u8(value: u8, allowed: u8) -> Result<(), NnrpError> {
283    if value & !allowed != 0 {
284        return Err(NnrpError::ReservedBitsSet {
285            value: value as u64,
286            allowed: allowed as u64,
287        });
288    }
289
290    Ok(())
291}
292
293fn validate_mask_u32(value: u32, allowed: u32) -> Result<(), NnrpError> {
294    if value & !allowed != 0 {
295        return Err(NnrpError::ReservedBitsSet {
296            value: value as u64,
297            allowed: allowed as u64,
298        });
299    }
300
301    Ok(())
302}
303
304fn read_u16(source: &[u8], offset: usize) -> u16 {
305    u16::from_le_bytes(source[offset..offset + 2].try_into().expect("slice length"))
306}
307
308fn read_u32(source: &[u8], offset: usize) -> u32 {
309    u32::from_le_bytes(source[offset..offset + 4].try_into().expect("slice length"))
310}
311
312fn read_u64(source: &[u8], offset: usize) -> u64 {
313    u64::from_le_bytes(source[offset..offset + 8].try_into().expect("slice length"))
314}
315
316fn write_u16(destination: &mut [u8], offset: usize, value: u16) {
317    destination[offset..offset + 2].copy_from_slice(&value.to_le_bytes());
318}
319
320fn write_u32(destination: &mut [u8], offset: usize, value: u32) {
321    destination[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
322}
323
324fn write_u64(destination: &mut [u8], offset: usize, value: u64) {
325    destination[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
326}
327
328#[cfg(test)]
329mod tests {
330    use super::{
331        SessionCloseAckMetadata, SessionCloseMetadata, SessionOpenAckMetadata, SessionOpenMetadata,
332    };
333    use crate::{
334        InFlightPolicy, NnrpError, SessionCloseReason, SessionCloseStatus, SessionPriorityClass,
335        SessionStatus,
336    };
337
338    #[test]
339    fn session_open_metadata_round_trips_golden_vector() {
340        let bytes = hex_to_bytes("2a000000020001050110000003000000f40100000400000030750000100000002000000008000000efcdab8967452301");
341
342        let metadata = SessionOpenMetadata::parse(&bytes).expect("metadata should parse");
343
344        assert_eq!(metadata.requested_session_id, 42);
345        assert_eq!(metadata.profile_id, 2);
346        assert_eq!(metadata.priority_class, SessionPriorityClass::Balanced);
347        assert_eq!(metadata.session_flags, 0x05);
348        assert_eq!(metadata.schema_id, 0x0000_1001);
349        assert_eq!(metadata.schema_version, 3);
350        assert_eq!(metadata.default_deadline_ms, 500);
351        assert_eq!(metadata.max_in_flight_operations, 4);
352        assert_eq!(metadata.lease_ttl_hint_ms, 30_000);
353        assert_eq!(metadata.resume_token_bytes, 16);
354        assert_eq!(metadata.auth_bytes, 32);
355        assert_eq!(metadata.session_extension_bytes, 8);
356        assert_eq!(metadata.client_session_tag, 0x0123_4567_89ab_cdef);
357        assert_eq!(metadata.to_bytes().unwrap().as_slice(), bytes.as_slice());
358    }
359
360    #[test]
361    fn session_open_rejects_reserved_flags() {
362        let mut bytes = hex_to_bytes("2a000000020001050110000003000000f40100000400000030750000100000002000000008000000efcdab8967452301");
363        bytes[7] = 0x10;
364
365        assert_eq!(
366            SessionOpenMetadata::parse(&bytes),
367            Err(NnrpError::ReservedBitsSet {
368                value: 0x10,
369                allowed: 0x0f
370            })
371        );
372    }
373
374    #[test]
375    fn session_open_ack_metadata_round_trips_golden_vector() {
376        let bytes = hex_to_bytes("2a0000000200010001100000030000000200040030750000c0d40100100000000800000021436587a9cbed0f070000000000000005000000");
377
378        let metadata = SessionOpenAckMetadata::parse(&bytes).expect("metadata should parse");
379
380        assert_eq!(metadata.session_id, 42);
381        assert_eq!(metadata.accepted_profile_id, 2);
382        assert_eq!(
383            metadata.accepted_priority_class,
384            SessionPriorityClass::Balanced
385        );
386        assert_eq!(metadata.session_status, SessionStatus::Opened);
387        assert_eq!(metadata.schema_id, 0x0000_1001);
388        assert_eq!(metadata.schema_version, 3);
389        assert_eq!(metadata.granted_operation_credit, 2);
390        assert_eq!(metadata.max_in_flight_operations, 4);
391        assert_eq!(metadata.lease_ttl_ms, 30_000);
392        assert_eq!(metadata.resume_window_ms, 120_000);
393        assert_eq!(metadata.resume_token_bytes, 16);
394        assert_eq!(metadata.session_extension_bytes, 8);
395        assert_eq!(metadata.server_session_tag, 0x0fed_cba9_8765_4321);
396        assert_eq!(metadata.route_scope_id, 7);
397        assert_eq!(metadata.session_error_code, 0);
398        assert_eq!(metadata.session_flags_ack, 5);
399        assert_eq!(metadata.to_bytes().unwrap().as_slice(), bytes.as_slice());
400    }
401
402    #[test]
403    fn session_close_metadata_round_trips_golden_vector() {
404        let bytes = hex_to_bytes("01000000e803000063000000000000000000000044332211");
405
406        let metadata = SessionCloseMetadata::parse(&bytes).expect("metadata should parse");
407
408        assert_eq!(metadata.close_reason, SessionCloseReason::ClientShutdown);
409        assert_eq!(metadata.in_flight_policy, InFlightPolicy::Drain);
410        assert_eq!(metadata.drain_timeout_ms, 1000);
411        assert_eq!(metadata.last_operation_id, 99);
412        assert_eq!(metadata.session_error_code, 0);
413        assert_eq!(metadata.session_close_tag, 0x1122_3344);
414        assert_eq!(metadata.to_bytes().unwrap().as_slice(), bytes.as_slice());
415    }
416
417    #[test]
418    fn session_close_ack_metadata_round_trips_golden_vector() {
419        let bytes = hex_to_bytes("01000000630000000000000000000000");
420
421        let metadata = SessionCloseAckMetadata::parse(&bytes).expect("metadata should parse");
422
423        assert_eq!(metadata.close_status, SessionCloseStatus::Draining);
424        assert_eq!(metadata.last_operation_id, 99);
425        assert_eq!(metadata.session_error_code, 0);
426        assert_eq!(metadata.to_bytes().unwrap().as_slice(), bytes.as_slice());
427    }
428
429    fn hex_to_bytes(hex: &str) -> Vec<u8> {
430        assert_eq!(hex.len() % 2, 0);
431        (0..hex.len())
432            .step_by(2)
433            .map(|index| u8::from_str_radix(&hex[index..index + 2], 16).unwrap())
434            .collect()
435    }
436}