Skip to main content

liminal/protocol/
lifecycle.rs

1use super::{Frame, FrameType, ProtocolError, validate_stream};
2
3/// Connection-level lifecycle states enforced by the protocol.
4#[derive(Clone, Copy, Debug, PartialEq, Eq)]
5pub enum ConnectionState {
6    /// Waiting for the client Connect frame.
7    Connecting,
8    /// Waiting for the server `ConnectAck` or `ConnectError` frame.
9    Authenticating,
10    /// Handshake complete; application and keepalive frames may flow.
11    Active,
12    /// Disconnect has been sent; waiting for the peer's Disconnect acknowledgement.
13    Closing,
14    /// Connection is closed and accepts no further frames.
15    Closed,
16}
17
18impl ConnectionState {
19    /// Validate that a frame type is accepted in this state.
20    ///
21    /// Unknown frame types are forward-compatible and are allowed to pass through
22    /// lifecycle validation after codec length-delimited skipping.
23    ///
24    /// # Errors
25    ///
26    /// Returns [`ProtocolError::InvalidStateTransition`] when `frame_type` is not
27    /// accepted by this state.
28    pub fn validate_frame_type(self, frame_type: FrameType) -> Result<(), ProtocolError> {
29        if self.accepts(frame_type) {
30            Ok(())
31        } else {
32            Err(ProtocolError::invalid_state_transition(self, frame_type))
33        }
34    }
35
36    /// Validate a frame and return the next lifecycle state.
37    ///
38    /// # Errors
39    ///
40    /// Returns [`ProtocolError::InvalidStream`] for invalid stream placement and
41    /// [`ProtocolError::InvalidStateTransition`] for out-of-sequence frames.
42    pub fn transition(self, frame: &Frame) -> Result<Self, ProtocolError> {
43        validate_stream(frame.frame_type(), frame.stream_id())?;
44        self.transition_frame_type(frame.frame_type())
45    }
46
47    /// Validate a frame type and return the next lifecycle state.
48    ///
49    /// # Errors
50    ///
51    /// Returns [`ProtocolError::InvalidStateTransition`] when `frame_type` is not
52    /// accepted by this state.
53    pub fn transition_frame_type(self, frame_type: FrameType) -> Result<Self, ProtocolError> {
54        self.validate_frame_type(frame_type)?;
55        Ok(self.next_state(frame_type))
56    }
57
58    /// Validate and handle a frame, returning the next state and any protocol response.
59    ///
60    /// Active Ping frames produce a Pong response on stream 0. This function does
61    /// not perform transport I/O.
62    ///
63    /// # Errors
64    ///
65    /// Returns [`ProtocolError::InvalidStream`] for invalid stream placement and
66    /// [`ProtocolError::InvalidStateTransition`] for out-of-sequence frames.
67    pub fn handle_frame(self, frame: &Frame) -> Result<LifecycleAction, ProtocolError> {
68        let next_state = self.transition(frame)?;
69        let response = if self == Self::Active && frame.frame_type() == FrameType::Ping {
70            Some(Frame::Pong { flags: 0 })
71        } else {
72            None
73        };
74
75        Ok(LifecycleAction {
76            state: next_state,
77            response,
78        })
79    }
80
81    const fn accepts(self, frame_type: FrameType) -> bool {
82        if matches!(self, Self::Closed) {
83            return false;
84        }
85
86        if matches!(frame_type, FrameType::Unknown(_)) {
87            return true;
88        }
89
90        match self {
91            Self::Connecting => matches!(frame_type, FrameType::Connect),
92            Self::Authenticating => {
93                matches!(frame_type, FrameType::ConnectAck | FrameType::ConnectError)
94            }
95            Self::Active => Self::accepts_active(frame_type),
96            Self::Closing => matches!(frame_type, FrameType::Disconnect),
97            Self::Closed => false,
98        }
99    }
100
101    const fn accepts_active(frame_type: FrameType) -> bool {
102        matches!(
103            frame_type,
104            FrameType::Disconnect
105                | FrameType::Subscribe
106                | FrameType::SubscribeAck
107                | FrameType::SubscribeError
108                | FrameType::Unsubscribe
109                | FrameType::Publish
110                | FrameType::PublishAck
111                | FrameType::PublishError
112                | FrameType::ConversationOpen
113                | FrameType::ConversationMessage
114                | FrameType::ConversationClose
115                | FrameType::ConversationError
116                | FrameType::Accept
117                | FrameType::Defer
118                | FrameType::Reject
119                | FrameType::Ping
120                | FrameType::Pong
121        )
122    }
123
124    const fn next_state(self, frame_type: FrameType) -> Self {
125        match (self, frame_type) {
126            (Self::Connecting, FrameType::Connect) => Self::Authenticating,
127            (Self::Authenticating, FrameType::ConnectAck) => Self::Active,
128            (Self::Authenticating, FrameType::ConnectError)
129            | (Self::Closing, FrameType::Disconnect) => Self::Closed,
130            (Self::Active, FrameType::Disconnect) => Self::Closing,
131            _ => self,
132        }
133    }
134}
135
136/// Result of lifecycle frame handling.
137#[derive(Clone, Debug, PartialEq, Eq)]
138pub struct LifecycleAction {
139    /// State after accepting the frame.
140    pub state: ConnectionState,
141    /// Optional protocol response generated by handling the frame.
142    pub response: Option<Frame>,
143}
144
145#[cfg(test)]
146mod tests {
147    use super::ConnectionState;
148    use crate::protocol::{Frame, FrameType, ProtocolError, ProtocolVersion, validate_stream};
149
150    #[test]
151    fn connecting_rejects_publish_with_state_and_frame_type() {
152        let result = ConnectionState::Connecting.validate_frame_type(FrameType::Publish);
153
154        assert!(matches!(
155            result,
156            Err(ProtocolError::InvalidStateTransition {
157                current_state: ConnectionState::Connecting,
158                frame_type: FrameType::Publish,
159                ..
160            })
161        ));
162    }
163
164    #[test]
165    fn active_accepts_subscribe() -> Result<(), ProtocolError> {
166        ConnectionState::Active.validate_frame_type(FrameType::Subscribe)
167    }
168
169    #[test]
170    fn connecting_rejects_ping_with_state_and_frame_type() {
171        let result = ConnectionState::Connecting.validate_frame_type(FrameType::Ping);
172
173        assert!(matches!(
174            result,
175            Err(ProtocolError::InvalidStateTransition {
176                current_state: ConnectionState::Connecting,
177                frame_type: FrameType::Ping,
178                ..
179            })
180        ));
181    }
182
183    #[test]
184    fn connect_transitions_to_authenticating() -> Result<(), ProtocolError> {
185        let frame = Frame::Connect {
186            flags: 0,
187            min_version: ProtocolVersion::new(1, 0),
188            max_version: ProtocolVersion::new(2, 0),
189            auth_token: vec![1, 2, 3],
190        };
191
192        assert_eq!(
193            ConnectionState::Connecting.transition(&frame)?,
194            ConnectionState::Authenticating
195        );
196        Ok(())
197    }
198
199    #[test]
200    fn connect_ack_transitions_authenticating_to_active() -> Result<(), ProtocolError> {
201        let frame = Frame::ConnectAck {
202            flags: 0,
203            selected_version: ProtocolVersion::new(1, 0),
204            capabilities: 0,
205        };
206
207        assert_eq!(
208            ConnectionState::Authenticating.transition(&frame)?,
209            ConnectionState::Active
210        );
211        Ok(())
212    }
213
214    #[test]
215    fn disconnect_ack_in_closing_transitions_to_closed() -> Result<(), ProtocolError> {
216        let frame = Frame::Disconnect { flags: 0 };
217
218        assert_eq!(
219            ConnectionState::Closing.transition(&frame)?,
220            ConnectionState::Closed
221        );
222        Ok(())
223    }
224
225    #[test]
226    fn closed_accepts_nothing() {
227        for frame_type in [FrameType::Disconnect, FrameType::Unknown(0xFE)] {
228            let result = ConnectionState::Closed.validate_frame_type(frame_type);
229
230            assert!(matches!(
231                result,
232                Err(ProtocolError::InvalidStateTransition {
233                    current_state: ConnectionState::Closed,
234                    frame_type: rejected,
235                    ..
236                }) if rejected == frame_type
237            ));
238        }
239    }
240
241    #[test]
242    fn ping_on_stream_zero_in_active_is_valid() -> Result<(), ProtocolError> {
243        let frame = Frame::Ping { flags: 0 };
244
245        assert_eq!(
246            ConnectionState::Active.transition(&frame)?,
247            ConnectionState::Active
248        );
249        Ok(())
250    }
251
252    #[test]
253    fn ping_on_stream_one_in_active_returns_invalid_stream() {
254        let result = validate_stream(FrameType::Ping, 1);
255
256        assert!(matches!(result, Err(ProtocolError::InvalidStream { .. })));
257    }
258
259    #[test]
260    fn ping_in_connecting_returns_invalid_state_transition() {
261        let frame = Frame::Ping { flags: 0 };
262        let result = ConnectionState::Connecting.transition(&frame);
263
264        assert!(matches!(
265            result,
266            Err(ProtocolError::InvalidStateTransition {
267                current_state: ConnectionState::Connecting,
268                frame_type: FrameType::Ping,
269                ..
270            })
271        ));
272    }
273
274    #[test]
275    fn ping_generates_pong_response_on_stream_zero() -> Result<(), ProtocolError> {
276        let frame = Frame::Ping { flags: 7 };
277        let action = ConnectionState::Active.handle_frame(&frame)?;
278
279        assert_eq!(action.state, ConnectionState::Active);
280        assert_eq!(action.response, Some(Frame::Pong { flags: 0 }));
281        assert_eq!(action.response.as_ref().map(Frame::stream_id), Some(0));
282        Ok(())
283    }
284
285    #[test]
286    fn pong_frames_carry_no_payload_semantically() -> Result<(), ProtocolError> {
287        let frame = Frame::Pong { flags: 0 };
288        let action = ConnectionState::Active.handle_frame(&frame)?;
289
290        assert_eq!(action.state, ConnectionState::Active);
291        assert_eq!(action.response, None);
292        Ok(())
293    }
294}