1use std::collections::BTreeMap;
2
3use crate::{
4 CommonHeader, FlowScopeKind, FlowUpdateMetadata, MessageType, NnrpError,
5 SessionCloseAckMetadata, SessionCloseMetadata, SessionCloseStatus, SessionOpenAckMetadata,
6 SessionStatus,
7};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ConnectionLifecycleState {
11 Open,
12 Closing,
13 Closed,
14}
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum SessionLifecycleState {
18 Open,
19 Resumed,
20 Closing,
21 Draining,
22 Closed,
23}
24
25impl SessionLifecycleState {
26 pub fn accepts_session_scoped_messages(self) -> bool {
27 matches!(
28 self,
29 Self::Open | Self::Resumed | Self::Closing | Self::Draining
30 )
31 }
32
33 pub fn accepts_new_operations(self) -> bool {
34 matches!(self, Self::Open | Self::Resumed)
35 }
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct SessionLifecycle {
40 pub session_id: u32,
41 pub state: SessionLifecycleState,
42 pub profile_id: u16,
43 pub priority_class: crate::SessionPriorityClass,
44 pub schema_id: u32,
45 pub schema_version: u32,
46 pub max_in_flight_operations: u16,
47 pub route_scope_id: u32,
48 pub last_operation_id: u64,
49 pub session_error_code: u32,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ConnectionLifecycle {
54 state: ConnectionLifecycleState,
55 sessions: BTreeMap<u32, SessionLifecycle>,
56}
57
58impl Default for ConnectionLifecycle {
59 fn default() -> Self {
60 Self::new()
61 }
62}
63
64impl ConnectionLifecycle {
65 pub fn new() -> Self {
66 Self {
67 state: ConnectionLifecycleState::Open,
68 sessions: BTreeMap::new(),
69 }
70 }
71
72 pub fn state(&self) -> ConnectionLifecycleState {
73 self.state
74 }
75
76 pub fn session_count(&self) -> usize {
77 self.sessions.len()
78 }
79
80 pub fn session(&self, session_id: u32) -> Option<&SessionLifecycle> {
81 self.sessions.get(&session_id)
82 }
83
84 pub fn close_connection(&mut self) -> Result<(), NnrpError> {
85 match self.state {
86 ConnectionLifecycleState::Open | ConnectionLifecycleState::Closing => {
87 self.state = ConnectionLifecycleState::Closed;
88 for session in self.sessions.values_mut() {
89 session.state = SessionLifecycleState::Closed;
90 }
91 Ok(())
92 }
93 ConnectionLifecycleState::Closed => Err(NnrpError::ConnectionAlreadyClosed),
94 }
95 }
96
97 pub fn apply_session_open_ack(
98 &mut self,
99 ack: &SessionOpenAckMetadata,
100 ) -> Result<(), NnrpError> {
101 self.require_connection_open()?;
102
103 match ack.session_status {
104 SessionStatus::Opened | SessionStatus::Resumed => {
105 if ack.session_id == 0 {
106 return Err(NnrpError::InvalidProtocolCombination {
107 rule: "successful SESSION_OPEN_ACK requires a non-zero session_id",
108 });
109 }
110 if self.sessions.contains_key(&ack.session_id) {
111 return Err(NnrpError::SessionAlreadyExists(ack.session_id));
112 }
113
114 let state = match ack.session_status {
115 SessionStatus::Opened => SessionLifecycleState::Open,
116 SessionStatus::Resumed => SessionLifecycleState::Resumed,
117 SessionStatus::Rejected | SessionStatus::RetryLater => unreachable!(),
118 };
119
120 self.sessions.insert(
121 ack.session_id,
122 SessionLifecycle {
123 session_id: ack.session_id,
124 state,
125 profile_id: ack.accepted_profile_id,
126 priority_class: ack.accepted_priority_class,
127 schema_id: ack.schema_id,
128 schema_version: ack.schema_version,
129 max_in_flight_operations: ack.max_in_flight_operations,
130 route_scope_id: ack.route_scope_id,
131 last_operation_id: 0,
132 session_error_code: ack.session_error_code,
133 },
134 );
135 }
136 SessionStatus::Rejected | SessionStatus::RetryLater => {
137 if ack.session_id != 0 {
138 return Err(NnrpError::InvalidProtocolCombination {
139 rule: "rejected SESSION_OPEN_ACK must not install a session_id",
140 });
141 }
142 }
143 }
144
145 Ok(())
146 }
147
148 pub fn begin_session_close(
149 &mut self,
150 header: &CommonHeader,
151 close: &SessionCloseMetadata,
152 ) -> Result<(), NnrpError> {
153 self.require_connection_open()?;
154 if header.message_type != MessageType::SessionClose {
155 return Err(NnrpError::InvalidProtocolCombination {
156 rule: "SESSION_CLOSE lifecycle transition requires a SESSION_CLOSE header",
157 });
158 }
159 if header.session_id == 0 {
160 return Err(NnrpError::InvalidProtocolCombination {
161 rule: "SESSION_CLOSE requires header.session_id!=0",
162 });
163 }
164
165 let session = self
166 .sessions
167 .get_mut(&header.session_id)
168 .ok_or(NnrpError::UnknownSession(header.session_id))?;
169 if !session.state.accepts_new_operations() {
170 return Err(NnrpError::SessionNotOpen(header.session_id));
171 }
172
173 session.state = SessionLifecycleState::Closing;
174 session.last_operation_id = close.last_operation_id;
175 session.session_error_code = close.session_error_code;
176 Ok(())
177 }
178
179 pub fn apply_session_close_ack(
180 &mut self,
181 header: &CommonHeader,
182 ack: &SessionCloseAckMetadata,
183 ) -> Result<(), NnrpError> {
184 self.require_connection_open()?;
185 if header.message_type != MessageType::SessionCloseAck {
186 return Err(NnrpError::InvalidProtocolCombination {
187 rule: "SESSION_CLOSE_ACK lifecycle transition requires a SESSION_CLOSE_ACK header",
188 });
189 }
190 if header.session_id == 0 {
191 return Err(NnrpError::InvalidProtocolCombination {
192 rule: "SESSION_CLOSE_ACK requires header.session_id!=0",
193 });
194 }
195
196 let session = self
197 .sessions
198 .get_mut(&header.session_id)
199 .ok_or(NnrpError::UnknownSession(header.session_id))?;
200
201 if !matches!(
202 session.state,
203 SessionLifecycleState::Open
204 | SessionLifecycleState::Resumed
205 | SessionLifecycleState::Closing
206 | SessionLifecycleState::Draining
207 ) {
208 return Err(NnrpError::SessionNotOpen(header.session_id));
209 }
210
211 session.state = match ack.close_status {
212 SessionCloseStatus::Acknowledged => SessionLifecycleState::Closing,
213 SessionCloseStatus::Draining => SessionLifecycleState::Draining,
214 SessionCloseStatus::Closed => SessionLifecycleState::Closed,
215 SessionCloseStatus::Rejected => SessionLifecycleState::Open,
216 };
217 session.last_operation_id = ack.last_operation_id;
218 session.session_error_code = ack.session_error_code;
219 Ok(())
220 }
221
222 pub fn validate_flow_update(
223 &self,
224 header: &CommonHeader,
225 metadata: &FlowUpdateMetadata,
226 ) -> Result<(), NnrpError> {
227 self.require_connection_open()?;
228 metadata.validate_routing(header)?;
229
230 match metadata.scope_kind {
231 FlowScopeKind::Connection => Ok(()),
232 FlowScopeKind::Session | FlowScopeKind::Operation => self
233 .sessions
234 .get(&header.session_id)
235 .filter(|session| session.state.accepts_session_scoped_messages())
236 .map(|_| ())
237 .ok_or(NnrpError::UnknownSession(header.session_id)),
238 }
239 }
240
241 fn require_connection_open(&self) -> Result<(), NnrpError> {
242 match self.state {
243 ConnectionLifecycleState::Open => Ok(()),
244 ConnectionLifecycleState::Closing | ConnectionLifecycleState::Closed => {
245 Err(NnrpError::ConnectionNotOpen)
246 }
247 }
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use crate::{
254 BackpressureLevel, FlowScopeKind, FlowUpdateReason, InFlightPolicy, MessageType,
255 SessionCloseAckMetadata, SessionCloseMetadata, SessionCloseReason, SessionCloseStatus,
256 SessionOpenAckMetadata, SessionPriorityClass, SessionStatus,
257 };
258
259 use super::{ConnectionLifecycle, ConnectionLifecycleState, SessionLifecycleState};
260
261 #[test]
262 fn session_open_ack_installs_multiple_independent_sessions() {
263 let mut connection = ConnectionLifecycle::default();
264
265 connection.apply_session_open_ack(&open_ack(42)).unwrap();
266 connection.apply_session_open_ack(&open_ack(43)).unwrap();
267
268 assert_eq!(connection.session_count(), 2);
269 assert_eq!(
270 connection.session(42).unwrap().state,
271 SessionLifecycleState::Open
272 );
273 assert_eq!(
274 connection.session(43).unwrap().state,
275 SessionLifecycleState::Open
276 );
277 }
278
279 #[test]
280 fn session_open_ack_rejects_invalid_or_duplicate_successes() {
281 let mut connection = ConnectionLifecycle::new();
282 let mut zero_success = open_ack(0);
283 assert_eq!(
284 connection.apply_session_open_ack(&zero_success),
285 Err(crate::NnrpError::InvalidProtocolCombination {
286 rule: "successful SESSION_OPEN_ACK requires a non-zero session_id"
287 })
288 );
289
290 zero_success.session_id = 42;
291 zero_success.session_status = SessionStatus::Resumed;
292 connection.apply_session_open_ack(&zero_success).unwrap();
293 assert_eq!(
294 connection.session(42).unwrap().state,
295 SessionLifecycleState::Resumed
296 );
297 assert_eq!(
298 connection.apply_session_open_ack(&zero_success),
299 Err(crate::NnrpError::SessionAlreadyExists(42))
300 );
301 }
302
303 #[test]
304 fn rejected_session_open_ack_does_not_install_session() {
305 let mut connection = ConnectionLifecycle::new();
306 let mut ack = open_ack(0);
307 ack.session_status = SessionStatus::Rejected;
308 ack.session_error_code = crate::SESSION_ERROR_PROFILE_UNSUPPORTED;
309
310 connection.apply_session_open_ack(&ack).unwrap();
311
312 assert_eq!(connection.session_count(), 0);
313
314 ack.session_id = 42;
315 assert_eq!(
316 connection.apply_session_open_ack(&ack),
317 Err(crate::NnrpError::InvalidProtocolCombination {
318 rule: "rejected SESSION_OPEN_ACK must not install a session_id"
319 })
320 );
321 }
322
323 #[test]
324 fn session_close_only_moves_target_session() {
325 let mut connection = ConnectionLifecycle::new();
326 connection.apply_session_open_ack(&open_ack(42)).unwrap();
327 connection.apply_session_open_ack(&open_ack(43)).unwrap();
328
329 let mut close_header = crate::CommonHeader::new(MessageType::SessionClose, 24, 0);
330 close_header.session_id = 42;
331 connection
332 .begin_session_close(&close_header, &close_metadata(7))
333 .unwrap();
334
335 assert_eq!(
336 connection.session(42).unwrap().state,
337 SessionLifecycleState::Closing
338 );
339 assert_eq!(connection.session(42).unwrap().last_operation_id, 7);
340 assert_eq!(
341 connection.session(43).unwrap().state,
342 SessionLifecycleState::Open
343 );
344 }
345
346 #[test]
347 fn session_close_rejects_invalid_headers_and_non_open_sessions() {
348 let mut connection = ConnectionLifecycle::new();
349 connection.apply_session_open_ack(&open_ack(42)).unwrap();
350
351 let wrong_header = crate::CommonHeader::new(MessageType::Ping, 24, 0);
352 assert_eq!(
353 connection.begin_session_close(&wrong_header, &close_metadata(0)),
354 Err(crate::NnrpError::InvalidProtocolCombination {
355 rule: "SESSION_CLOSE lifecycle transition requires a SESSION_CLOSE header"
356 })
357 );
358
359 let zero_session_header = crate::CommonHeader::new(MessageType::SessionClose, 24, 0);
360 assert_eq!(
361 connection.begin_session_close(&zero_session_header, &close_metadata(0)),
362 Err(crate::NnrpError::InvalidProtocolCombination {
363 rule: "SESSION_CLOSE requires header.session_id!=0"
364 })
365 );
366
367 let mut unknown_header = crate::CommonHeader::new(MessageType::SessionClose, 24, 0);
368 unknown_header.session_id = 9;
369 assert_eq!(
370 connection.begin_session_close(&unknown_header, &close_metadata(0)),
371 Err(crate::NnrpError::UnknownSession(9))
372 );
373
374 let mut close_ack_header = crate::CommonHeader::new(MessageType::SessionCloseAck, 16, 0);
375 close_ack_header.session_id = 42;
376 connection
377 .apply_session_close_ack(&close_ack_header, &close_ack(SessionCloseStatus::Closed, 0))
378 .unwrap();
379 let mut close_header = crate::CommonHeader::new(MessageType::SessionClose, 24, 0);
380 close_header.session_id = 42;
381 assert_eq!(
382 connection.begin_session_close(&close_header, &close_metadata(0)),
383 Err(crate::NnrpError::SessionNotOpen(42))
384 );
385 }
386
387 #[test]
388 fn draining_session_accepts_scope_routing_until_closed() {
389 let mut connection = ConnectionLifecycle::new();
390 connection.apply_session_open_ack(&open_ack(42)).unwrap();
391
392 let mut close_ack_header = crate::CommonHeader::new(MessageType::SessionCloseAck, 16, 0);
393 close_ack_header.session_id = 42;
394 let mut close_ack = close_ack(SessionCloseStatus::Draining, 10);
395 connection
396 .apply_session_close_ack(&close_ack_header, &close_ack)
397 .unwrap();
398
399 let mut flow_header = crate::CommonHeader::new(MessageType::FlowUpdate, 32, 0);
400 flow_header.session_id = 42;
401 let flow = flow_update(FlowScopeKind::Session, 0);
402 connection
403 .validate_flow_update(&flow_header, &flow)
404 .unwrap();
405
406 close_ack.close_status = SessionCloseStatus::Closed;
407 connection
408 .apply_session_close_ack(&close_ack_header, &close_ack)
409 .unwrap();
410
411 assert_eq!(
412 connection.validate_flow_update(&flow_header, &flow),
413 Err(crate::NnrpError::UnknownSession(42))
414 );
415 }
416
417 #[test]
418 fn session_close_ack_rejects_invalid_headers_and_restores_rejected_close() {
419 let mut connection = ConnectionLifecycle::new();
420 connection.apply_session_open_ack(&open_ack(42)).unwrap();
421
422 let wrong_header = crate::CommonHeader::new(MessageType::Ping, 16, 0);
423 assert_eq!(
424 connection.apply_session_close_ack(
425 &wrong_header,
426 &close_ack(SessionCloseStatus::Acknowledged, 0)
427 ),
428 Err(crate::NnrpError::InvalidProtocolCombination {
429 rule: "SESSION_CLOSE_ACK lifecycle transition requires a SESSION_CLOSE_ACK header"
430 })
431 );
432
433 let zero_session_header = crate::CommonHeader::new(MessageType::SessionCloseAck, 16, 0);
434 assert_eq!(
435 connection.apply_session_close_ack(
436 &zero_session_header,
437 &close_ack(SessionCloseStatus::Acknowledged, 0)
438 ),
439 Err(crate::NnrpError::InvalidProtocolCombination {
440 rule: "SESSION_CLOSE_ACK requires header.session_id!=0"
441 })
442 );
443
444 let mut close_ack_header = crate::CommonHeader::new(MessageType::SessionCloseAck, 16, 0);
445 close_ack_header.session_id = 42;
446 connection
447 .apply_session_close_ack(
448 &close_ack_header,
449 &close_ack(SessionCloseStatus::Acknowledged, 1),
450 )
451 .unwrap();
452 assert_eq!(
453 connection.session(42).unwrap().state,
454 SessionLifecycleState::Closing
455 );
456 connection
457 .apply_session_close_ack(
458 &close_ack_header,
459 &close_ack(SessionCloseStatus::Rejected, 1),
460 )
461 .unwrap();
462 assert_eq!(
463 connection.session(42).unwrap().state,
464 SessionLifecycleState::Open
465 );
466 }
467
468 #[test]
469 fn connection_scope_flow_update_does_not_require_session() {
470 let connection = ConnectionLifecycle::new();
471 let flow_header = crate::CommonHeader::new(MessageType::FlowUpdate, 32, 0);
472 let mut flow = flow_update(FlowScopeKind::Connection, 0);
473 flow.connection_credit = 1;
474 flow.session_credit = 0;
475
476 connection
477 .validate_flow_update(&flow_header, &flow)
478 .unwrap();
479 }
480
481 #[test]
482 fn closing_connection_closes_all_sessions() {
483 let mut connection = ConnectionLifecycle::new();
484 connection.apply_session_open_ack(&open_ack(42)).unwrap();
485 connection.apply_session_open_ack(&open_ack(43)).unwrap();
486
487 connection.close_connection().unwrap();
488
489 assert_eq!(connection.state(), ConnectionLifecycleState::Closed);
490 assert_eq!(
491 connection.session(42).unwrap().state,
492 SessionLifecycleState::Closed
493 );
494 assert_eq!(
495 connection.session(43).unwrap().state,
496 SessionLifecycleState::Closed
497 );
498 assert_eq!(
499 connection.close_connection(),
500 Err(crate::NnrpError::ConnectionAlreadyClosed)
501 );
502 assert_eq!(
503 connection.apply_session_open_ack(&open_ack(44)),
504 Err(crate::NnrpError::ConnectionNotOpen)
505 );
506 }
507
508 fn open_ack(session_id: u32) -> SessionOpenAckMetadata {
509 SessionOpenAckMetadata {
510 session_id,
511 accepted_profile_id: 2,
512 accepted_priority_class: SessionPriorityClass::Balanced,
513 session_status: SessionStatus::Opened,
514 schema_id: 0x1001,
515 schema_version: 3,
516 granted_operation_credit: 2,
517 max_in_flight_operations: 4,
518 lease_ttl_ms: 30_000,
519 resume_window_ms: 120_000,
520 resume_token_bytes: 16,
521 session_extension_bytes: 8,
522 server_session_tag: 0x0fed_cba9_8765_4321,
523 route_scope_id: 7,
524 session_error_code: 0,
525 session_flags_ack: 5,
526 }
527 }
528
529 fn close_metadata(last_operation_id: u64) -> SessionCloseMetadata {
530 SessionCloseMetadata {
531 close_reason: SessionCloseReason::ClientShutdown,
532 in_flight_policy: InFlightPolicy::Drain,
533 drain_timeout_ms: 1000,
534 last_operation_id,
535 session_error_code: 0,
536 session_close_tag: 0x1122_3344,
537 }
538 }
539
540 fn close_ack(
541 close_status: SessionCloseStatus,
542 last_operation_id: u64,
543 ) -> SessionCloseAckMetadata {
544 SessionCloseAckMetadata {
545 close_status,
546 last_operation_id,
547 session_error_code: 0,
548 }
549 }
550
551 fn flow_update(scope_kind: FlowScopeKind, operation_id: u64) -> crate::FlowUpdateMetadata {
552 crate::FlowUpdateMetadata {
553 scope_kind,
554 update_reason: FlowUpdateReason::Grant,
555 backpressure_level: BackpressureLevel::None,
556 connection_credit: 0,
557 session_credit: 1,
558 operation_credit: 0,
559 operation_id,
560 retry_after_ms: 0,
561 credit_epoch: 1,
562 flow_flags: 1,
563 }
564 }
565}