Skip to main content

nnrp_core/
lifecycle.rs

1use std::collections::BTreeMap;
2
3use crate::{
4    CommonHeader, FlowScopeKind, FlowUpdateMetadata, MessageType, NnrpError,
5    SessionCloseAckMetadata, SessionCloseMetadata, SessionCloseStatus, SessionOpenAckMetadata,
6    SessionStatus,
7};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum ConnectionLifecycleState {
11    Open,
12    Closing,
13    Closed,
14}
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum SessionLifecycleState {
18    Open,
19    Resumed,
20    Closing,
21    Draining,
22    Closed,
23}
24
25impl SessionLifecycleState {
26    pub fn accepts_session_scoped_messages(self) -> bool {
27        matches!(
28            self,
29            Self::Open | Self::Resumed | Self::Closing | Self::Draining
30        )
31    }
32
33    pub fn accepts_new_operations(self) -> bool {
34        matches!(self, Self::Open | Self::Resumed)
35    }
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct SessionLifecycle {
40    pub session_id: u32,
41    pub state: SessionLifecycleState,
42    pub profile_id: u16,
43    pub priority_class: crate::SessionPriorityClass,
44    pub schema_id: u32,
45    pub schema_version: u32,
46    pub max_in_flight_operations: u16,
47    pub route_scope_id: u32,
48    pub last_operation_id: u64,
49    pub session_error_code: u32,
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ConnectionLifecycle {
54    state: ConnectionLifecycleState,
55    sessions: BTreeMap<u32, SessionLifecycle>,
56}
57
58impl Default for ConnectionLifecycle {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64impl ConnectionLifecycle {
65    pub fn new() -> Self {
66        Self {
67            state: ConnectionLifecycleState::Open,
68            sessions: BTreeMap::new(),
69        }
70    }
71
72    pub fn state(&self) -> ConnectionLifecycleState {
73        self.state
74    }
75
76    pub fn session_count(&self) -> usize {
77        self.sessions.len()
78    }
79
80    pub fn session(&self, session_id: u32) -> Option<&SessionLifecycle> {
81        self.sessions.get(&session_id)
82    }
83
84    pub fn close_connection(&mut self) -> Result<(), NnrpError> {
85        match self.state {
86            ConnectionLifecycleState::Open | ConnectionLifecycleState::Closing => {
87                self.state = ConnectionLifecycleState::Closed;
88                for session in self.sessions.values_mut() {
89                    session.state = SessionLifecycleState::Closed;
90                }
91                Ok(())
92            }
93            ConnectionLifecycleState::Closed => Err(NnrpError::ConnectionAlreadyClosed),
94        }
95    }
96
97    pub fn apply_session_open_ack(
98        &mut self,
99        ack: &SessionOpenAckMetadata,
100    ) -> Result<(), NnrpError> {
101        self.require_connection_open()?;
102
103        match ack.session_status {
104            SessionStatus::Opened | SessionStatus::Resumed => {
105                if ack.session_id == 0 {
106                    return Err(NnrpError::InvalidProtocolCombination {
107                        rule: "successful SESSION_OPEN_ACK requires a non-zero session_id",
108                    });
109                }
110                if self.sessions.contains_key(&ack.session_id) {
111                    return Err(NnrpError::SessionAlreadyExists(ack.session_id));
112                }
113
114                let state = match ack.session_status {
115                    SessionStatus::Opened => SessionLifecycleState::Open,
116                    SessionStatus::Resumed => SessionLifecycleState::Resumed,
117                    SessionStatus::Rejected | SessionStatus::RetryLater => unreachable!(),
118                };
119
120                self.sessions.insert(
121                    ack.session_id,
122                    SessionLifecycle {
123                        session_id: ack.session_id,
124                        state,
125                        profile_id: ack.accepted_profile_id,
126                        priority_class: ack.accepted_priority_class,
127                        schema_id: ack.schema_id,
128                        schema_version: ack.schema_version,
129                        max_in_flight_operations: ack.max_in_flight_operations,
130                        route_scope_id: ack.route_scope_id,
131                        last_operation_id: 0,
132                        session_error_code: ack.session_error_code,
133                    },
134                );
135            }
136            SessionStatus::Rejected | SessionStatus::RetryLater => {
137                if ack.session_id != 0 {
138                    return Err(NnrpError::InvalidProtocolCombination {
139                        rule: "rejected SESSION_OPEN_ACK must not install a session_id",
140                    });
141                }
142            }
143        }
144
145        Ok(())
146    }
147
148    pub fn begin_session_close(
149        &mut self,
150        header: &CommonHeader,
151        close: &SessionCloseMetadata,
152    ) -> Result<(), NnrpError> {
153        self.require_connection_open()?;
154        if header.message_type != MessageType::SessionClose {
155            return Err(NnrpError::InvalidProtocolCombination {
156                rule: "SESSION_CLOSE lifecycle transition requires a SESSION_CLOSE header",
157            });
158        }
159        if header.session_id == 0 {
160            return Err(NnrpError::InvalidProtocolCombination {
161                rule: "SESSION_CLOSE requires header.session_id!=0",
162            });
163        }
164
165        let session = self
166            .sessions
167            .get_mut(&header.session_id)
168            .ok_or(NnrpError::UnknownSession(header.session_id))?;
169        if !session.state.accepts_new_operations() {
170            return Err(NnrpError::SessionNotOpen(header.session_id));
171        }
172
173        session.state = SessionLifecycleState::Closing;
174        session.last_operation_id = close.last_operation_id;
175        session.session_error_code = close.session_error_code;
176        Ok(())
177    }
178
179    pub fn apply_session_close_ack(
180        &mut self,
181        header: &CommonHeader,
182        ack: &SessionCloseAckMetadata,
183    ) -> Result<(), NnrpError> {
184        self.require_connection_open()?;
185        if header.message_type != MessageType::SessionCloseAck {
186            return Err(NnrpError::InvalidProtocolCombination {
187                rule: "SESSION_CLOSE_ACK lifecycle transition requires a SESSION_CLOSE_ACK header",
188            });
189        }
190        if header.session_id == 0 {
191            return Err(NnrpError::InvalidProtocolCombination {
192                rule: "SESSION_CLOSE_ACK requires header.session_id!=0",
193            });
194        }
195
196        let session = self
197            .sessions
198            .get_mut(&header.session_id)
199            .ok_or(NnrpError::UnknownSession(header.session_id))?;
200
201        if !matches!(
202            session.state,
203            SessionLifecycleState::Open
204                | SessionLifecycleState::Resumed
205                | SessionLifecycleState::Closing
206                | SessionLifecycleState::Draining
207        ) {
208            return Err(NnrpError::SessionNotOpen(header.session_id));
209        }
210
211        session.state = match ack.close_status {
212            SessionCloseStatus::Acknowledged => SessionLifecycleState::Closing,
213            SessionCloseStatus::Draining => SessionLifecycleState::Draining,
214            SessionCloseStatus::Closed => SessionLifecycleState::Closed,
215            SessionCloseStatus::Rejected => SessionLifecycleState::Open,
216        };
217        session.last_operation_id = ack.last_operation_id;
218        session.session_error_code = ack.session_error_code;
219        Ok(())
220    }
221
222    pub fn validate_flow_update(
223        &self,
224        header: &CommonHeader,
225        metadata: &FlowUpdateMetadata,
226    ) -> Result<(), NnrpError> {
227        self.require_connection_open()?;
228        metadata.validate_routing(header)?;
229
230        match metadata.scope_kind {
231            FlowScopeKind::Connection => Ok(()),
232            FlowScopeKind::Session | FlowScopeKind::Operation => self
233                .sessions
234                .get(&header.session_id)
235                .filter(|session| session.state.accepts_session_scoped_messages())
236                .map(|_| ())
237                .ok_or(NnrpError::UnknownSession(header.session_id)),
238        }
239    }
240
241    fn require_connection_open(&self) -> Result<(), NnrpError> {
242        match self.state {
243            ConnectionLifecycleState::Open => Ok(()),
244            ConnectionLifecycleState::Closing | ConnectionLifecycleState::Closed => {
245                Err(NnrpError::ConnectionNotOpen)
246            }
247        }
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use crate::{
254        BackpressureLevel, FlowScopeKind, FlowUpdateReason, InFlightPolicy, MessageType,
255        SessionCloseAckMetadata, SessionCloseMetadata, SessionCloseReason, SessionCloseStatus,
256        SessionOpenAckMetadata, SessionPriorityClass, SessionStatus,
257    };
258
259    use super::{ConnectionLifecycle, ConnectionLifecycleState, SessionLifecycleState};
260
261    #[test]
262    fn session_open_ack_installs_multiple_independent_sessions() {
263        let mut connection = ConnectionLifecycle::default();
264
265        connection.apply_session_open_ack(&open_ack(42)).unwrap();
266        connection.apply_session_open_ack(&open_ack(43)).unwrap();
267
268        assert_eq!(connection.session_count(), 2);
269        assert_eq!(
270            connection.session(42).unwrap().state,
271            SessionLifecycleState::Open
272        );
273        assert_eq!(
274            connection.session(43).unwrap().state,
275            SessionLifecycleState::Open
276        );
277    }
278
279    #[test]
280    fn session_open_ack_rejects_invalid_or_duplicate_successes() {
281        let mut connection = ConnectionLifecycle::new();
282        let mut zero_success = open_ack(0);
283        assert_eq!(
284            connection.apply_session_open_ack(&zero_success),
285            Err(crate::NnrpError::InvalidProtocolCombination {
286                rule: "successful SESSION_OPEN_ACK requires a non-zero session_id"
287            })
288        );
289
290        zero_success.session_id = 42;
291        zero_success.session_status = SessionStatus::Resumed;
292        connection.apply_session_open_ack(&zero_success).unwrap();
293        assert_eq!(
294            connection.session(42).unwrap().state,
295            SessionLifecycleState::Resumed
296        );
297        assert_eq!(
298            connection.apply_session_open_ack(&zero_success),
299            Err(crate::NnrpError::SessionAlreadyExists(42))
300        );
301    }
302
303    #[test]
304    fn rejected_session_open_ack_does_not_install_session() {
305        let mut connection = ConnectionLifecycle::new();
306        let mut ack = open_ack(0);
307        ack.session_status = SessionStatus::Rejected;
308        ack.session_error_code = crate::SESSION_ERROR_PROFILE_UNSUPPORTED;
309
310        connection.apply_session_open_ack(&ack).unwrap();
311
312        assert_eq!(connection.session_count(), 0);
313
314        ack.session_id = 42;
315        assert_eq!(
316            connection.apply_session_open_ack(&ack),
317            Err(crate::NnrpError::InvalidProtocolCombination {
318                rule: "rejected SESSION_OPEN_ACK must not install a session_id"
319            })
320        );
321    }
322
323    #[test]
324    fn session_close_only_moves_target_session() {
325        let mut connection = ConnectionLifecycle::new();
326        connection.apply_session_open_ack(&open_ack(42)).unwrap();
327        connection.apply_session_open_ack(&open_ack(43)).unwrap();
328
329        let mut close_header = crate::CommonHeader::new(MessageType::SessionClose, 24, 0);
330        close_header.session_id = 42;
331        connection
332            .begin_session_close(&close_header, &close_metadata(7))
333            .unwrap();
334
335        assert_eq!(
336            connection.session(42).unwrap().state,
337            SessionLifecycleState::Closing
338        );
339        assert_eq!(connection.session(42).unwrap().last_operation_id, 7);
340        assert_eq!(
341            connection.session(43).unwrap().state,
342            SessionLifecycleState::Open
343        );
344    }
345
346    #[test]
347    fn session_close_rejects_invalid_headers_and_non_open_sessions() {
348        let mut connection = ConnectionLifecycle::new();
349        connection.apply_session_open_ack(&open_ack(42)).unwrap();
350
351        let wrong_header = crate::CommonHeader::new(MessageType::Ping, 24, 0);
352        assert_eq!(
353            connection.begin_session_close(&wrong_header, &close_metadata(0)),
354            Err(crate::NnrpError::InvalidProtocolCombination {
355                rule: "SESSION_CLOSE lifecycle transition requires a SESSION_CLOSE header"
356            })
357        );
358
359        let zero_session_header = crate::CommonHeader::new(MessageType::SessionClose, 24, 0);
360        assert_eq!(
361            connection.begin_session_close(&zero_session_header, &close_metadata(0)),
362            Err(crate::NnrpError::InvalidProtocolCombination {
363                rule: "SESSION_CLOSE requires header.session_id!=0"
364            })
365        );
366
367        let mut unknown_header = crate::CommonHeader::new(MessageType::SessionClose, 24, 0);
368        unknown_header.session_id = 9;
369        assert_eq!(
370            connection.begin_session_close(&unknown_header, &close_metadata(0)),
371            Err(crate::NnrpError::UnknownSession(9))
372        );
373
374        let mut close_ack_header = crate::CommonHeader::new(MessageType::SessionCloseAck, 16, 0);
375        close_ack_header.session_id = 42;
376        connection
377            .apply_session_close_ack(&close_ack_header, &close_ack(SessionCloseStatus::Closed, 0))
378            .unwrap();
379        let mut close_header = crate::CommonHeader::new(MessageType::SessionClose, 24, 0);
380        close_header.session_id = 42;
381        assert_eq!(
382            connection.begin_session_close(&close_header, &close_metadata(0)),
383            Err(crate::NnrpError::SessionNotOpen(42))
384        );
385    }
386
387    #[test]
388    fn draining_session_accepts_scope_routing_until_closed() {
389        let mut connection = ConnectionLifecycle::new();
390        connection.apply_session_open_ack(&open_ack(42)).unwrap();
391
392        let mut close_ack_header = crate::CommonHeader::new(MessageType::SessionCloseAck, 16, 0);
393        close_ack_header.session_id = 42;
394        let mut close_ack = close_ack(SessionCloseStatus::Draining, 10);
395        connection
396            .apply_session_close_ack(&close_ack_header, &close_ack)
397            .unwrap();
398
399        let mut flow_header = crate::CommonHeader::new(MessageType::FlowUpdate, 32, 0);
400        flow_header.session_id = 42;
401        let flow = flow_update(FlowScopeKind::Session, 0);
402        connection
403            .validate_flow_update(&flow_header, &flow)
404            .unwrap();
405
406        close_ack.close_status = SessionCloseStatus::Closed;
407        connection
408            .apply_session_close_ack(&close_ack_header, &close_ack)
409            .unwrap();
410
411        assert_eq!(
412            connection.validate_flow_update(&flow_header, &flow),
413            Err(crate::NnrpError::UnknownSession(42))
414        );
415    }
416
417    #[test]
418    fn session_close_ack_rejects_invalid_headers_and_restores_rejected_close() {
419        let mut connection = ConnectionLifecycle::new();
420        connection.apply_session_open_ack(&open_ack(42)).unwrap();
421
422        let wrong_header = crate::CommonHeader::new(MessageType::Ping, 16, 0);
423        assert_eq!(
424            connection.apply_session_close_ack(
425                &wrong_header,
426                &close_ack(SessionCloseStatus::Acknowledged, 0)
427            ),
428            Err(crate::NnrpError::InvalidProtocolCombination {
429                rule: "SESSION_CLOSE_ACK lifecycle transition requires a SESSION_CLOSE_ACK header"
430            })
431        );
432
433        let zero_session_header = crate::CommonHeader::new(MessageType::SessionCloseAck, 16, 0);
434        assert_eq!(
435            connection.apply_session_close_ack(
436                &zero_session_header,
437                &close_ack(SessionCloseStatus::Acknowledged, 0)
438            ),
439            Err(crate::NnrpError::InvalidProtocolCombination {
440                rule: "SESSION_CLOSE_ACK requires header.session_id!=0"
441            })
442        );
443
444        let mut close_ack_header = crate::CommonHeader::new(MessageType::SessionCloseAck, 16, 0);
445        close_ack_header.session_id = 42;
446        connection
447            .apply_session_close_ack(
448                &close_ack_header,
449                &close_ack(SessionCloseStatus::Acknowledged, 1),
450            )
451            .unwrap();
452        assert_eq!(
453            connection.session(42).unwrap().state,
454            SessionLifecycleState::Closing
455        );
456        connection
457            .apply_session_close_ack(
458                &close_ack_header,
459                &close_ack(SessionCloseStatus::Rejected, 1),
460            )
461            .unwrap();
462        assert_eq!(
463            connection.session(42).unwrap().state,
464            SessionLifecycleState::Open
465        );
466    }
467
468    #[test]
469    fn connection_scope_flow_update_does_not_require_session() {
470        let connection = ConnectionLifecycle::new();
471        let flow_header = crate::CommonHeader::new(MessageType::FlowUpdate, 32, 0);
472        let mut flow = flow_update(FlowScopeKind::Connection, 0);
473        flow.connection_credit = 1;
474        flow.session_credit = 0;
475
476        connection
477            .validate_flow_update(&flow_header, &flow)
478            .unwrap();
479    }
480
481    #[test]
482    fn closing_connection_closes_all_sessions() {
483        let mut connection = ConnectionLifecycle::new();
484        connection.apply_session_open_ack(&open_ack(42)).unwrap();
485        connection.apply_session_open_ack(&open_ack(43)).unwrap();
486
487        connection.close_connection().unwrap();
488
489        assert_eq!(connection.state(), ConnectionLifecycleState::Closed);
490        assert_eq!(
491            connection.session(42).unwrap().state,
492            SessionLifecycleState::Closed
493        );
494        assert_eq!(
495            connection.session(43).unwrap().state,
496            SessionLifecycleState::Closed
497        );
498        assert_eq!(
499            connection.close_connection(),
500            Err(crate::NnrpError::ConnectionAlreadyClosed)
501        );
502        assert_eq!(
503            connection.apply_session_open_ack(&open_ack(44)),
504            Err(crate::NnrpError::ConnectionNotOpen)
505        );
506    }
507
508    fn open_ack(session_id: u32) -> SessionOpenAckMetadata {
509        SessionOpenAckMetadata {
510            session_id,
511            accepted_profile_id: 2,
512            accepted_priority_class: SessionPriorityClass::Balanced,
513            session_status: SessionStatus::Opened,
514            schema_id: 0x1001,
515            schema_version: 3,
516            granted_operation_credit: 2,
517            max_in_flight_operations: 4,
518            lease_ttl_ms: 30_000,
519            resume_window_ms: 120_000,
520            resume_token_bytes: 16,
521            session_extension_bytes: 8,
522            server_session_tag: 0x0fed_cba9_8765_4321,
523            route_scope_id: 7,
524            session_error_code: 0,
525            session_flags_ack: 5,
526        }
527    }
528
529    fn close_metadata(last_operation_id: u64) -> SessionCloseMetadata {
530        SessionCloseMetadata {
531            close_reason: SessionCloseReason::ClientShutdown,
532            in_flight_policy: InFlightPolicy::Drain,
533            drain_timeout_ms: 1000,
534            last_operation_id,
535            session_error_code: 0,
536            session_close_tag: 0x1122_3344,
537        }
538    }
539
540    fn close_ack(
541        close_status: SessionCloseStatus,
542        last_operation_id: u64,
543    ) -> SessionCloseAckMetadata {
544        SessionCloseAckMetadata {
545            close_status,
546            last_operation_id,
547            session_error_code: 0,
548        }
549    }
550
551    fn flow_update(scope_kind: FlowScopeKind, operation_id: u64) -> crate::FlowUpdateMetadata {
552        crate::FlowUpdateMetadata {
553            scope_kind,
554            update_reason: FlowUpdateReason::Grant,
555            backpressure_level: BackpressureLevel::None,
556            connection_credit: 0,
557            session_credit: 1,
558            operation_credit: 0,
559            operation_id,
560            retry_after_ms: 0,
561            credit_epoch: 1,
562            flow_flags: 1,
563        }
564    }
565}