1use super::{Frame, FrameType, ProtocolError, validate_stream};
2
3#[derive(Clone, Copy, Debug, PartialEq, Eq)]
5pub enum ConnectionState {
6 Connecting,
8 Authenticating,
10 Active,
12 Closing,
14 Closed,
16}
17
18impl ConnectionState {
19 pub fn validate_frame_type(self, frame_type: FrameType) -> Result<(), ProtocolError> {
29 if self.accepts(frame_type) {
30 Ok(())
31 } else {
32 Err(ProtocolError::invalid_state_transition(self, frame_type))
33 }
34 }
35
36 pub fn transition(self, frame: &Frame) -> Result<Self, ProtocolError> {
43 validate_stream(frame.frame_type(), frame.stream_id())?;
44 self.transition_frame_type(frame.frame_type())
45 }
46
47 pub fn transition_frame_type(self, frame_type: FrameType) -> Result<Self, ProtocolError> {
54 self.validate_frame_type(frame_type)?;
55 Ok(self.next_state(frame_type))
56 }
57
58 pub fn handle_frame(self, frame: &Frame) -> Result<LifecycleAction, ProtocolError> {
68 let next_state = self.transition(frame)?;
69 let response = if self == Self::Active && frame.frame_type() == FrameType::Ping {
70 Some(Frame::Pong { flags: 0 })
71 } else {
72 None
73 };
74
75 Ok(LifecycleAction {
76 state: next_state,
77 response,
78 })
79 }
80
81 const fn accepts(self, frame_type: FrameType) -> bool {
82 if matches!(self, Self::Closed) {
83 return false;
84 }
85
86 if matches!(frame_type, FrameType::Unknown(_)) {
87 return true;
88 }
89
90 match self {
91 Self::Connecting => matches!(frame_type, FrameType::Connect),
92 Self::Authenticating => {
93 matches!(frame_type, FrameType::ConnectAck | FrameType::ConnectError)
94 }
95 Self::Active => Self::accepts_active(frame_type),
96 Self::Closing => matches!(frame_type, FrameType::Disconnect),
97 Self::Closed => false,
98 }
99 }
100
101 const fn accepts_active(frame_type: FrameType) -> bool {
102 matches!(
103 frame_type,
104 FrameType::Disconnect
105 | FrameType::Subscribe
106 | FrameType::SubscribeAck
107 | FrameType::SubscribeError
108 | FrameType::Unsubscribe
109 | FrameType::Publish
110 | FrameType::PublishAck
111 | FrameType::PublishError
112 | FrameType::ConversationOpen
113 | FrameType::ConversationMessage
114 | FrameType::ConversationClose
115 | FrameType::ConversationError
116 | FrameType::Accept
117 | FrameType::Defer
118 | FrameType::Reject
119 | FrameType::Ping
120 | FrameType::Pong
121 )
122 }
123
124 const fn next_state(self, frame_type: FrameType) -> Self {
125 match (self, frame_type) {
126 (Self::Connecting, FrameType::Connect) => Self::Authenticating,
127 (Self::Authenticating, FrameType::ConnectAck) => Self::Active,
128 (Self::Authenticating, FrameType::ConnectError)
129 | (Self::Closing, FrameType::Disconnect) => Self::Closed,
130 (Self::Active, FrameType::Disconnect) => Self::Closing,
131 _ => self,
132 }
133 }
134}
135
136#[derive(Clone, Debug, PartialEq, Eq)]
138pub struct LifecycleAction {
139 pub state: ConnectionState,
141 pub response: Option<Frame>,
143}
144
145#[cfg(test)]
146mod tests {
147 use super::ConnectionState;
148 use crate::protocol::{Frame, FrameType, ProtocolError, ProtocolVersion, validate_stream};
149
150 #[test]
151 fn connecting_rejects_publish_with_state_and_frame_type() {
152 let result = ConnectionState::Connecting.validate_frame_type(FrameType::Publish);
153
154 assert!(matches!(
155 result,
156 Err(ProtocolError::InvalidStateTransition {
157 current_state: ConnectionState::Connecting,
158 frame_type: FrameType::Publish,
159 ..
160 })
161 ));
162 }
163
164 #[test]
165 fn active_accepts_subscribe() -> Result<(), ProtocolError> {
166 ConnectionState::Active.validate_frame_type(FrameType::Subscribe)
167 }
168
169 #[test]
170 fn connecting_rejects_ping_with_state_and_frame_type() {
171 let result = ConnectionState::Connecting.validate_frame_type(FrameType::Ping);
172
173 assert!(matches!(
174 result,
175 Err(ProtocolError::InvalidStateTransition {
176 current_state: ConnectionState::Connecting,
177 frame_type: FrameType::Ping,
178 ..
179 })
180 ));
181 }
182
183 #[test]
184 fn connect_transitions_to_authenticating() -> Result<(), ProtocolError> {
185 let frame = Frame::Connect {
186 flags: 0,
187 min_version: ProtocolVersion::new(1, 0),
188 max_version: ProtocolVersion::new(2, 0),
189 auth_token: vec![1, 2, 3],
190 };
191
192 assert_eq!(
193 ConnectionState::Connecting.transition(&frame)?,
194 ConnectionState::Authenticating
195 );
196 Ok(())
197 }
198
199 #[test]
200 fn connect_ack_transitions_authenticating_to_active() -> Result<(), ProtocolError> {
201 let frame = Frame::ConnectAck {
202 flags: 0,
203 selected_version: ProtocolVersion::new(1, 0),
204 capabilities: 0,
205 };
206
207 assert_eq!(
208 ConnectionState::Authenticating.transition(&frame)?,
209 ConnectionState::Active
210 );
211 Ok(())
212 }
213
214 #[test]
215 fn disconnect_ack_in_closing_transitions_to_closed() -> Result<(), ProtocolError> {
216 let frame = Frame::Disconnect { flags: 0 };
217
218 assert_eq!(
219 ConnectionState::Closing.transition(&frame)?,
220 ConnectionState::Closed
221 );
222 Ok(())
223 }
224
225 #[test]
226 fn closed_accepts_nothing() {
227 for frame_type in [FrameType::Disconnect, FrameType::Unknown(0xFE)] {
228 let result = ConnectionState::Closed.validate_frame_type(frame_type);
229
230 assert!(matches!(
231 result,
232 Err(ProtocolError::InvalidStateTransition {
233 current_state: ConnectionState::Closed,
234 frame_type: rejected,
235 ..
236 }) if rejected == frame_type
237 ));
238 }
239 }
240
241 #[test]
242 fn ping_on_stream_zero_in_active_is_valid() -> Result<(), ProtocolError> {
243 let frame = Frame::Ping { flags: 0 };
244
245 assert_eq!(
246 ConnectionState::Active.transition(&frame)?,
247 ConnectionState::Active
248 );
249 Ok(())
250 }
251
252 #[test]
253 fn ping_on_stream_one_in_active_returns_invalid_stream() {
254 let result = validate_stream(FrameType::Ping, 1);
255
256 assert!(matches!(result, Err(ProtocolError::InvalidStream { .. })));
257 }
258
259 #[test]
260 fn ping_in_connecting_returns_invalid_state_transition() {
261 let frame = Frame::Ping { flags: 0 };
262 let result = ConnectionState::Connecting.transition(&frame);
263
264 assert!(matches!(
265 result,
266 Err(ProtocolError::InvalidStateTransition {
267 current_state: ConnectionState::Connecting,
268 frame_type: FrameType::Ping,
269 ..
270 })
271 ));
272 }
273
274 #[test]
275 fn ping_generates_pong_response_on_stream_zero() -> Result<(), ProtocolError> {
276 let frame = Frame::Ping { flags: 7 };
277 let action = ConnectionState::Active.handle_frame(&frame)?;
278
279 assert_eq!(action.state, ConnectionState::Active);
280 assert_eq!(action.response, Some(Frame::Pong { flags: 0 }));
281 assert_eq!(action.response.as_ref().map(Frame::stream_id), Some(0));
282 Ok(())
283 }
284
285 #[test]
286 fn pong_frames_carry_no_payload_semantically() -> Result<(), ProtocolError> {
287 let frame = Frame::Pong { flags: 0 };
288 let action = ConnectionState::Active.handle_frame(&frame)?;
289
290 assert_eq!(action.state, ConnectionState::Active);
291 assert_eq!(action.response, None);
292 Ok(())
293 }
294}