1use crate::{
2 InFlightPolicy, NnrpError, SessionCloseReason, SessionCloseStatus, SessionPriorityClass,
3 SessionStatus,
4};
5
6pub const SESSION_OPEN_METADATA_LEN: usize = 48;
7pub const SESSION_OPEN_ACK_METADATA_LEN: usize = 56;
8pub const SESSION_CLOSE_METADATA_LEN: usize = 24;
9pub const SESSION_CLOSE_ACK_METADATA_LEN: usize = 16;
10
11pub const SESSION_FLAGS_KNOWN_MASK: u8 = 0x0f;
12pub const SESSION_FLAGS_ACK_KNOWN_MASK: u32 = 0x0000_001f;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub struct SessionOpenMetadata {
16 pub requested_session_id: u32,
17 pub profile_id: u16,
18 pub priority_class: SessionPriorityClass,
19 pub session_flags: u8,
20 pub schema_id: u32,
21 pub schema_version: u32,
22 pub default_deadline_ms: u32,
23 pub max_in_flight_operations: u16,
24 pub lease_ttl_hint_ms: u32,
25 pub resume_token_bytes: u32,
26 pub auth_bytes: u32,
27 pub session_extension_bytes: u32,
28 pub client_session_tag: u64,
29}
30
31impl SessionOpenMetadata {
32 pub fn parse(source: &[u8]) -> Result<Self, NnrpError> {
33 require_len(source, SESSION_OPEN_METADATA_LEN)?;
34 let reserved0 = read_u16(source, 22);
35 validate_zero_u16("session_open.reserved0", reserved0)?;
36
37 let session_flags = source[7];
38 validate_mask_u8(session_flags, SESSION_FLAGS_KNOWN_MASK)?;
39
40 Ok(Self {
41 requested_session_id: read_u32(source, 0),
42 profile_id: read_u16(source, 4),
43 priority_class: SessionPriorityClass::try_from_u8(source[6])?,
44 session_flags,
45 schema_id: read_u32(source, 8),
46 schema_version: read_u32(source, 12),
47 default_deadline_ms: read_u32(source, 16),
48 max_in_flight_operations: read_u16(source, 20),
49 lease_ttl_hint_ms: read_u32(source, 24),
50 resume_token_bytes: read_u32(source, 28),
51 auth_bytes: read_u32(source, 32),
52 session_extension_bytes: read_u32(source, 36),
53 client_session_tag: read_u64(source, 40),
54 })
55 }
56
57 pub fn write(&self, destination: &mut [u8]) -> Result<(), NnrpError> {
58 require_destination_len(destination, SESSION_OPEN_METADATA_LEN)?;
59 validate_mask_u8(self.session_flags, SESSION_FLAGS_KNOWN_MASK)?;
60
61 destination[..SESSION_OPEN_METADATA_LEN].fill(0);
62 write_u32(destination, 0, self.requested_session_id);
63 write_u16(destination, 4, self.profile_id);
64 destination[6] = self.priority_class as u8;
65 destination[7] = self.session_flags;
66 write_u32(destination, 8, self.schema_id);
67 write_u32(destination, 12, self.schema_version);
68 write_u32(destination, 16, self.default_deadline_ms);
69 write_u16(destination, 20, self.max_in_flight_operations);
70 write_u32(destination, 24, self.lease_ttl_hint_ms);
71 write_u32(destination, 28, self.resume_token_bytes);
72 write_u32(destination, 32, self.auth_bytes);
73 write_u32(destination, 36, self.session_extension_bytes);
74 write_u64(destination, 40, self.client_session_tag);
75 Ok(())
76 }
77
78 pub fn to_bytes(&self) -> Result<[u8; SESSION_OPEN_METADATA_LEN], NnrpError> {
79 let mut bytes = [0u8; SESSION_OPEN_METADATA_LEN];
80 self.write(&mut bytes)?;
81 Ok(bytes)
82 }
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub struct SessionOpenAckMetadata {
87 pub session_id: u32,
88 pub accepted_profile_id: u16,
89 pub accepted_priority_class: SessionPriorityClass,
90 pub session_status: SessionStatus,
91 pub schema_id: u32,
92 pub schema_version: u32,
93 pub granted_operation_credit: u16,
94 pub max_in_flight_operations: u16,
95 pub lease_ttl_ms: u32,
96 pub resume_window_ms: u32,
97 pub resume_token_bytes: u32,
98 pub session_extension_bytes: u32,
99 pub server_session_tag: u64,
100 pub route_scope_id: u32,
101 pub session_error_code: u32,
102 pub session_flags_ack: u32,
103}
104
105impl SessionOpenAckMetadata {
106 pub fn parse(source: &[u8]) -> Result<Self, NnrpError> {
107 require_len(source, SESSION_OPEN_ACK_METADATA_LEN)?;
108 let session_flags_ack = read_u32(source, 52);
109 validate_mask_u32(session_flags_ack, SESSION_FLAGS_ACK_KNOWN_MASK)?;
110
111 Ok(Self {
112 session_id: read_u32(source, 0),
113 accepted_profile_id: read_u16(source, 4),
114 accepted_priority_class: SessionPriorityClass::try_from_u8(source[6])?,
115 session_status: SessionStatus::try_from_u8(source[7])?,
116 schema_id: read_u32(source, 8),
117 schema_version: read_u32(source, 12),
118 granted_operation_credit: read_u16(source, 16),
119 max_in_flight_operations: read_u16(source, 18),
120 lease_ttl_ms: read_u32(source, 20),
121 resume_window_ms: read_u32(source, 24),
122 resume_token_bytes: read_u32(source, 28),
123 session_extension_bytes: read_u32(source, 32),
124 server_session_tag: read_u64(source, 36),
125 route_scope_id: read_u32(source, 44),
126 session_error_code: read_u32(source, 48),
127 session_flags_ack,
128 })
129 }
130
131 pub fn write(&self, destination: &mut [u8]) -> Result<(), NnrpError> {
132 require_destination_len(destination, SESSION_OPEN_ACK_METADATA_LEN)?;
133 validate_mask_u32(self.session_flags_ack, SESSION_FLAGS_ACK_KNOWN_MASK)?;
134
135 destination[..SESSION_OPEN_ACK_METADATA_LEN].fill(0);
136 write_u32(destination, 0, self.session_id);
137 write_u16(destination, 4, self.accepted_profile_id);
138 destination[6] = self.accepted_priority_class as u8;
139 destination[7] = self.session_status as u8;
140 write_u32(destination, 8, self.schema_id);
141 write_u32(destination, 12, self.schema_version);
142 write_u16(destination, 16, self.granted_operation_credit);
143 write_u16(destination, 18, self.max_in_flight_operations);
144 write_u32(destination, 20, self.lease_ttl_ms);
145 write_u32(destination, 24, self.resume_window_ms);
146 write_u32(destination, 28, self.resume_token_bytes);
147 write_u32(destination, 32, self.session_extension_bytes);
148 write_u64(destination, 36, self.server_session_tag);
149 write_u32(destination, 44, self.route_scope_id);
150 write_u32(destination, 48, self.session_error_code);
151 write_u32(destination, 52, self.session_flags_ack);
152 Ok(())
153 }
154
155 pub fn to_bytes(&self) -> Result<[u8; SESSION_OPEN_ACK_METADATA_LEN], NnrpError> {
156 let mut bytes = [0u8; SESSION_OPEN_ACK_METADATA_LEN];
157 self.write(&mut bytes)?;
158 Ok(bytes)
159 }
160}
161
162#[derive(Debug, Clone, Copy, PartialEq, Eq)]
163pub struct SessionCloseMetadata {
164 pub close_reason: SessionCloseReason,
165 pub in_flight_policy: InFlightPolicy,
166 pub drain_timeout_ms: u32,
167 pub last_operation_id: u64,
168 pub session_error_code: u32,
169 pub session_close_tag: u32,
170}
171
172impl SessionCloseMetadata {
173 pub fn parse(source: &[u8]) -> Result<Self, NnrpError> {
174 require_len(source, SESSION_CLOSE_METADATA_LEN)?;
175 validate_zero_u8("session_close.reserved0", source[3])?;
176
177 Ok(Self {
178 close_reason: SessionCloseReason::try_from_u16(read_u16(source, 0))?,
179 in_flight_policy: InFlightPolicy::try_from_u8(source[2])?,
180 drain_timeout_ms: read_u32(source, 4),
181 last_operation_id: read_u64(source, 8),
182 session_error_code: read_u32(source, 16),
183 session_close_tag: read_u32(source, 20),
184 })
185 }
186
187 pub fn write(&self, destination: &mut [u8]) -> Result<(), NnrpError> {
188 require_destination_len(destination, SESSION_CLOSE_METADATA_LEN)?;
189
190 destination[..SESSION_CLOSE_METADATA_LEN].fill(0);
191 write_u16(destination, 0, self.close_reason as u16);
192 destination[2] = self.in_flight_policy as u8;
193 write_u32(destination, 4, self.drain_timeout_ms);
194 write_u64(destination, 8, self.last_operation_id);
195 write_u32(destination, 16, self.session_error_code);
196 write_u32(destination, 20, self.session_close_tag);
197 Ok(())
198 }
199
200 pub fn to_bytes(&self) -> Result<[u8; SESSION_CLOSE_METADATA_LEN], NnrpError> {
201 let mut bytes = [0u8; SESSION_CLOSE_METADATA_LEN];
202 self.write(&mut bytes)?;
203 Ok(bytes)
204 }
205}
206
207#[derive(Debug, Clone, Copy, PartialEq, Eq)]
208pub struct SessionCloseAckMetadata {
209 pub close_status: SessionCloseStatus,
210 pub last_operation_id: u64,
211 pub session_error_code: u32,
212}
213
214impl SessionCloseAckMetadata {
215 pub fn parse(source: &[u8]) -> Result<Self, NnrpError> {
216 require_len(source, SESSION_CLOSE_ACK_METADATA_LEN)?;
217 validate_zero_u8("session_close_ack.reserved0", source[1])?;
218 validate_zero_u16("session_close_ack.reserved1", read_u16(source, 2))?;
219
220 Ok(Self {
221 close_status: SessionCloseStatus::try_from_u8(source[0])?,
222 last_operation_id: read_u64(source, 4),
223 session_error_code: read_u32(source, 12),
224 })
225 }
226
227 pub fn write(&self, destination: &mut [u8]) -> Result<(), NnrpError> {
228 require_destination_len(destination, SESSION_CLOSE_ACK_METADATA_LEN)?;
229
230 destination[..SESSION_CLOSE_ACK_METADATA_LEN].fill(0);
231 destination[0] = self.close_status as u8;
232 write_u64(destination, 4, self.last_operation_id);
233 write_u32(destination, 12, self.session_error_code);
234 Ok(())
235 }
236
237 pub fn to_bytes(&self) -> Result<[u8; SESSION_CLOSE_ACK_METADATA_LEN], NnrpError> {
238 let mut bytes = [0u8; SESSION_CLOSE_ACK_METADATA_LEN];
239 self.write(&mut bytes)?;
240 Ok(bytes)
241 }
242}
243
244fn require_len(source: &[u8], expected: usize) -> Result<(), NnrpError> {
245 if source.len() < expected {
246 return Err(NnrpError::SourceTooShort {
247 expected,
248 actual: source.len(),
249 });
250 }
251
252 Ok(())
253}
254
255fn require_destination_len(destination: &[u8], expected: usize) -> Result<(), NnrpError> {
256 if destination.len() < expected {
257 return Err(NnrpError::DestinationTooShort {
258 expected,
259 actual: destination.len(),
260 });
261 }
262
263 Ok(())
264}
265
266fn validate_zero_u8(field: &'static str, value: u8) -> Result<(), NnrpError> {
267 if value != 0 {
268 return Err(NnrpError::NonZeroReservedField { field });
269 }
270
271 Ok(())
272}
273
274fn validate_zero_u16(field: &'static str, value: u16) -> Result<(), NnrpError> {
275 if value != 0 {
276 return Err(NnrpError::NonZeroReservedField { field });
277 }
278
279 Ok(())
280}
281
282fn validate_mask_u8(value: u8, allowed: u8) -> Result<(), NnrpError> {
283 if value & !allowed != 0 {
284 return Err(NnrpError::ReservedBitsSet {
285 value: value as u64,
286 allowed: allowed as u64,
287 });
288 }
289
290 Ok(())
291}
292
293fn validate_mask_u32(value: u32, allowed: u32) -> Result<(), NnrpError> {
294 if value & !allowed != 0 {
295 return Err(NnrpError::ReservedBitsSet {
296 value: value as u64,
297 allowed: allowed as u64,
298 });
299 }
300
301 Ok(())
302}
303
304fn read_u16(source: &[u8], offset: usize) -> u16 {
305 u16::from_le_bytes(source[offset..offset + 2].try_into().expect("slice length"))
306}
307
308fn read_u32(source: &[u8], offset: usize) -> u32 {
309 u32::from_le_bytes(source[offset..offset + 4].try_into().expect("slice length"))
310}
311
312fn read_u64(source: &[u8], offset: usize) -> u64 {
313 u64::from_le_bytes(source[offset..offset + 8].try_into().expect("slice length"))
314}
315
316fn write_u16(destination: &mut [u8], offset: usize, value: u16) {
317 destination[offset..offset + 2].copy_from_slice(&value.to_le_bytes());
318}
319
320fn write_u32(destination: &mut [u8], offset: usize, value: u32) {
321 destination[offset..offset + 4].copy_from_slice(&value.to_le_bytes());
322}
323
324fn write_u64(destination: &mut [u8], offset: usize, value: u64) {
325 destination[offset..offset + 8].copy_from_slice(&value.to_le_bytes());
326}
327
328#[cfg(test)]
329mod tests {
330 use super::{
331 SessionCloseAckMetadata, SessionCloseMetadata, SessionOpenAckMetadata, SessionOpenMetadata,
332 };
333 use crate::{
334 InFlightPolicy, NnrpError, SessionCloseReason, SessionCloseStatus, SessionPriorityClass,
335 SessionStatus,
336 };
337
338 #[test]
339 fn session_open_metadata_round_trips_golden_vector() {
340 let bytes = hex_to_bytes("2a000000020001050110000003000000f40100000400000030750000100000002000000008000000efcdab8967452301");
341
342 let metadata = SessionOpenMetadata::parse(&bytes).expect("metadata should parse");
343
344 assert_eq!(metadata.requested_session_id, 42);
345 assert_eq!(metadata.profile_id, 2);
346 assert_eq!(metadata.priority_class, SessionPriorityClass::Balanced);
347 assert_eq!(metadata.session_flags, 0x05);
348 assert_eq!(metadata.schema_id, 0x0000_1001);
349 assert_eq!(metadata.schema_version, 3);
350 assert_eq!(metadata.default_deadline_ms, 500);
351 assert_eq!(metadata.max_in_flight_operations, 4);
352 assert_eq!(metadata.lease_ttl_hint_ms, 30_000);
353 assert_eq!(metadata.resume_token_bytes, 16);
354 assert_eq!(metadata.auth_bytes, 32);
355 assert_eq!(metadata.session_extension_bytes, 8);
356 assert_eq!(metadata.client_session_tag, 0x0123_4567_89ab_cdef);
357 assert_eq!(metadata.to_bytes().unwrap().as_slice(), bytes.as_slice());
358 }
359
360 #[test]
361 fn session_open_rejects_reserved_flags() {
362 let mut bytes = hex_to_bytes("2a000000020001050110000003000000f40100000400000030750000100000002000000008000000efcdab8967452301");
363 bytes[7] = 0x10;
364
365 assert_eq!(
366 SessionOpenMetadata::parse(&bytes),
367 Err(NnrpError::ReservedBitsSet {
368 value: 0x10,
369 allowed: 0x0f
370 })
371 );
372 }
373
374 #[test]
375 fn session_open_ack_metadata_round_trips_golden_vector() {
376 let bytes = hex_to_bytes("2a0000000200010001100000030000000200040030750000c0d40100100000000800000021436587a9cbed0f070000000000000005000000");
377
378 let metadata = SessionOpenAckMetadata::parse(&bytes).expect("metadata should parse");
379
380 assert_eq!(metadata.session_id, 42);
381 assert_eq!(metadata.accepted_profile_id, 2);
382 assert_eq!(
383 metadata.accepted_priority_class,
384 SessionPriorityClass::Balanced
385 );
386 assert_eq!(metadata.session_status, SessionStatus::Opened);
387 assert_eq!(metadata.schema_id, 0x0000_1001);
388 assert_eq!(metadata.schema_version, 3);
389 assert_eq!(metadata.granted_operation_credit, 2);
390 assert_eq!(metadata.max_in_flight_operations, 4);
391 assert_eq!(metadata.lease_ttl_ms, 30_000);
392 assert_eq!(metadata.resume_window_ms, 120_000);
393 assert_eq!(metadata.resume_token_bytes, 16);
394 assert_eq!(metadata.session_extension_bytes, 8);
395 assert_eq!(metadata.server_session_tag, 0x0fed_cba9_8765_4321);
396 assert_eq!(metadata.route_scope_id, 7);
397 assert_eq!(metadata.session_error_code, 0);
398 assert_eq!(metadata.session_flags_ack, 5);
399 assert_eq!(metadata.to_bytes().unwrap().as_slice(), bytes.as_slice());
400 }
401
402 #[test]
403 fn session_close_metadata_round_trips_golden_vector() {
404 let bytes = hex_to_bytes("01000000e803000063000000000000000000000044332211");
405
406 let metadata = SessionCloseMetadata::parse(&bytes).expect("metadata should parse");
407
408 assert_eq!(metadata.close_reason, SessionCloseReason::ClientShutdown);
409 assert_eq!(metadata.in_flight_policy, InFlightPolicy::Drain);
410 assert_eq!(metadata.drain_timeout_ms, 1000);
411 assert_eq!(metadata.last_operation_id, 99);
412 assert_eq!(metadata.session_error_code, 0);
413 assert_eq!(metadata.session_close_tag, 0x1122_3344);
414 assert_eq!(metadata.to_bytes().unwrap().as_slice(), bytes.as_slice());
415 }
416
417 #[test]
418 fn session_close_ack_metadata_round_trips_golden_vector() {
419 let bytes = hex_to_bytes("01000000630000000000000000000000");
420
421 let metadata = SessionCloseAckMetadata::parse(&bytes).expect("metadata should parse");
422
423 assert_eq!(metadata.close_status, SessionCloseStatus::Draining);
424 assert_eq!(metadata.last_operation_id, 99);
425 assert_eq!(metadata.session_error_code, 0);
426 assert_eq!(metadata.to_bytes().unwrap().as_slice(), bytes.as_slice());
427 }
428
429 fn hex_to_bytes(hex: &str) -> Vec<u8> {
430 assert_eq!(hex.len() % 2, 0);
431 (0..hex.len())
432 .step_by(2)
433 .map(|index| u8::from_str_radix(&hex[index..index + 2], 16).unwrap())
434 .collect()
435 }
436}