stomp_parser/model/frames/
mod.rs

1#[macro_use]
2mod sender;
3#[macro_use]
4mod macros;
5
6mod utils;
7
8#[allow(non_snake_case)]
9#[allow(unused_parens)]
10#[allow(clippy::new_without_default)]
11pub mod client {
12    //! Implements the model for the frames that a STOMP client can send, as specified in
13    //! the [STOMP Protocol Specification,Version 1.2](https://stomp.github.io/stomp-specification-1.2.html).
14
15    use crate::model::headers::*;
16
17    frames! {
18        Client,
19        (
20            Abort,
21            "Aborts a transaction that has begun but not yet been committed.",
22            ABORT,
23            Client,
24            transaction: Transaction
25        ),
26        (
27            Ack,
28            "Acknowledges a received message.",
29            ACK,
30            Client,
31            id: Id,
32            transaction: Transaction,
33            (receipt: Receipt)
34        ),
35        (
36            Begin,
37            "Begins a transaction.",
38            BEGIN,
39            Client,
40            transaction: Transaction,
41            (receipt: Receipt)
42        ),
43        (
44            Commit,
45            "Commits a transaction.",
46            COMMIT,
47            Client,
48            transaction: Transaction,
49            (receipt: Receipt)
50        ),
51        (
52            Connect,
53            "Initiates a STOMP session.",
54            CONNECT|STOMP,
55            Client,
56            host: Host,
57            accept_version: AcceptVersion,
58            (
59                heartbeat: HeartBeat: (||HeartBeatValue::new(HeartBeatIntervals::new(0,0))):"(0,0)",
60                login: Login,
61                passcode: Passcode
62            ),
63            [custom: cus],
64            "See [CONNECT Frame](https://stomp.github.io/stomp-specification-1.2.html#CONNECT_or_STOMP_Frame)."
65        ),
66        (
67            Disconnect,
68            "Ends a STOMP session.",
69            DISCONNECT,
70            Client,
71            receipt: Receipt
72        ),
73        (
74            Nack,
75            "Indicates that the client did not, or could not, process a message.",
76            NACK,
77            Client,
78            id: Id,
79            transaction: Transaction,
80            (receipt: Receipt)
81        ),
82        (
83            Send,
84            "Sends a message to a specific destination.",
85            SEND,
86            Client,
87            destination: Destination,
88            (
89                content_type: ContentType,
90                content_length: ContentLength,
91                transaction: Transaction,
92                receipt: Receipt
93            ),
94            [custom: cus],
95            [body: body]
96        ),
97        (
98            Subscribe,
99            "Subscribes to a specific destination.",
100            SUBSCRIBE,
101            Client,
102            destination: Destination,
103            id: Id,
104            (
105                ack_type: Ack: (||AckValue::new(AckType::Auto)):"Auto",
106                receipt: Receipt
107            ),
108            [custom: cus]
109        ),
110        (
111            Unsubscribe,
112            "Cancels a specific subscription.",
113            UNSUBSCRIBE,
114            Client,
115            id: Id,
116            (receipt: Receipt)
117        )
118    }
119
120    impl<'a> SendFrame<'a> {}
121}
122
123#[allow(non_snake_case)]
124#[allow(unused_parens)]
125#[allow(clippy::new_without_default)]
126pub mod server {
127    //! Implements the model for the frames that a STOMP server can send, as specified in the
128    //! [STOMP Protocol Specification,Version 1.2](https://stomp.github.io/stomp-specification-1.2.html).
129    use crate::model::headers::*;
130    frames! {
131        Server,
132        (
133            Connected,
134            CONNECTED,
135            Server,
136            version: Version,
137            (
138                heartbeat: HeartBeat,
139                session: Session, server: Server
140            )
141        ),
142        (
143            Receipt,
144            RECEIPT,
145            Server,
146            receipt_id: ReceiptId
147        ),
148        (
149            Error,
150            ERROR,
151            Server,
152            (message: Message),
153            [custom: cus],
154            [body: body]),
155        (
156            Message,
157            MESSAGE,
158            Server,
159            message_id: MessageId,
160            destination: Destination,
161            subscription: Subscription,
162            (
163                content_type: ContentType,
164                content_length: ContentLength
165            ),
166            [custom: cus],
167            [body: body]
168        )
169    }
170
171    impl<'a> ErrorFrame<'a> {
172        pub fn from_message(message: &str) -> Self {
173            ErrorFrameBuilder::new().message(message.to_owned()).build()
174        }
175    }
176}
177
178#[cfg(test)]
179#[macro_use]
180mod test {
181    use super::client::*;
182    use super::server::*;
183
184    use crate::model::headers::*;
185    use std::convert::TryFrom;
186    use std::convert::TryInto;
187    use std::str::FromStr;
188    use std::thread;
189
190    #[test]
191    fn new_builder_can_be_build() {
192        let frame = SendFrameBuilder::new("foo/bar".to_owned()).build();
193
194        assert_eq!("foo/bar", frame.destination().value());
195    }
196
197    #[test]
198    fn parses_stomp_frame() {
199        let result = ClientFrame::try_from(
200            "STOMP\nhost:foo\naccept-version:1.1\nheart-beat:10,20\n\n\u{00}"
201                .as_bytes()
202                .to_owned(),
203        );
204
205        if let Ok(ClientFrame::Connect(frame)) = result {
206            assert_eq!(StompVersion::V1_1, frame.accept_version().value().0[0])
207        } else {
208            panic!("Expected a connect frame")
209        }
210    }
211
212    #[test]
213    fn parses_connect_with_custom_headers() {
214        let result = ClientFrame::try_from(
215            "CONNECT\nhost:foo\naccept-version:1.1\nheart-beat:10,20\nfoo:bar\n\n\u{00}"
216                .as_bytes()
217                .to_owned(),
218        );
219
220        if let Ok(ClientFrame::Connect(frame)) = result {
221            assert_eq!(1, frame.custom.len());
222            assert_eq!("foo", frame.custom[0].header_name());
223            assert_eq!("bar", frame.custom[0].value().to_owned());
224        } else {
225            panic!("Expected a connect frame")
226        }
227    }
228
229    #[test]
230    fn writes_connect_with_custom_headers() {
231        let frame = ConnectFrameBuilder::new(
232            "foo".to_owned(),
233            StompVersions::from_str("1.1,1.2").unwrap(),
234        )
235        .add_custom_header("foo".to_owned(), "bar".to_owned())
236        .build();
237
238        let displayed = String::from_utf8(frame.into()).unwrap();
239
240        assert_eq!(
241            "CONNECT\nhost:foo\naccept-version:1.1,1.2\nheart-beat:0,0\nfoo:bar\n\n\x00",
242            displayed
243        );
244    }
245
246    #[test]
247    fn builds_connected_frame() {
248        let frame = ConnectedFrameBuilder::new(StompVersion::V1_1)
249            .heartbeat(HeartBeatIntervals {
250                supplied: 20,
251                expected: 10,
252            })
253            .build();
254
255        assert_eq!(StompVersion::V1_1, *(frame.version().value()));
256        assert_eq!(20, frame.heartbeat().unwrap().value().supplied);
257        assert_eq!(10, frame.heartbeat().unwrap().value().expected);
258    }
259
260    #[test]
261    fn writes_connected_frame() {
262        let frame = ConnectedFrameBuilder::new(StompVersion::V1_1)
263            .heartbeat(HeartBeatIntervals {
264                supplied: 20,
265                expected: 10,
266            })
267            .build();
268
269        let displayed: Vec<u8> = frame.into();
270
271        assert_eq!(
272            b"CONNECTED\nversion:1.1\nheart-beat:20,10\n\n\x00".to_vec(),
273            displayed
274        );
275    }
276
277    #[test]
278    fn builds_receipt_frame() {
279        let frame = ReceiptFrameBuilder::new("rcpt-1".to_owned()).build();
280
281        assert_eq!("rcpt-1", frame.receipt_id().value());
282    }
283
284    #[test]
285    fn writes_message_frame() {
286        let body = b"Lorem ipsum dolor sit amet,".to_vec();
287
288        let frame = MessageFrameBuilder::new(
289            "msg-1".to_owned(),
290            "path/to/hell".to_owned(),
291            "annual".to_owned(),
292        )
293        .content_type("foo/bar".to_owned())
294        .body(body)
295        .build();
296
297        assert_message_frame_roundtrip(
298            frame,
299            "msg-1",
300            "path/to/hell",
301            "annual",
302            Some("foo/bar"),
303            None,
304            &vec![],
305            Some(b"Lorem ipsum dolor sit amet,"),
306        );
307    }
308
309    #[test]
310    fn writes_custom_headers() {
311        let body = b"Lorem ipsum dolor sit amet,".to_vec();
312
313        let frame = MessageFrameBuilder::new(
314            "msg-1".to_owned(),
315            "path/to/hell".to_owned(),
316            "annual".to_owned(),
317        )
318        .content_type("foo/bar".to_owned())
319        .add_custom_header("hello".to_owned(), "world".to_owned())
320        .body(body)
321        .build();
322
323        assert_message_frame_roundtrip(
324            frame,
325            "msg-1",
326            "path/to/hell",
327            "annual",
328            Some("foo/bar"),
329            None,
330            &vec![("hello", "world")],
331            Some(b"Lorem ipsum dolor sit amet,"),
332        );
333    }
334
335    fn assert_message_frame_roundtrip(
336        frame: MessageFrame,
337        expected_id: &str,
338        expected_dest: &str,
339        expected_sub: &str,
340        expected_content_type: Option<&str>,
341        expected_content_length: Option<u32>,
342        expected_custom: &Vec<(&str, &str)>,
343        expected_body: Option<&[u8]>,
344    ) {
345        assert_message_frame(
346            &frame,
347            expected_id,
348            expected_dest,
349            expected_sub,
350            expected_content_type,
351            expected_content_length,
352            expected_custom,
353            expected_body,
354        );
355
356        let bytes: Vec<u8> = frame.try_into().expect("Error writing bytes");
357
358        if let Ok(ServerFrame::Message(frame)) = ServerFrame::try_from(bytes) {
359            assert_message_frame(
360                &frame,
361                expected_id,
362                expected_dest,
363                expected_sub,
364                expected_content_type,
365                expected_content_length,
366                expected_custom,
367                expected_body,
368            );
369        } else {
370            panic!("Should have received a Message frame")
371        }
372    }
373
374    fn assert_message_frame(
375        frame: &MessageFrame,
376        expected_id: &str,
377        expected_dest: &str,
378        expected_sub: &str,
379        expected_content_type: Option<&str>,
380        expected_content_length: Option<u32>,
381        expected_custom: &Vec<(&str, &str)>,
382        expected_body: Option<&[u8]>,
383    ) {
384        assert_eq!(
385            frame.message_id().value(),
386            expected_id,
387            "MessageId does not match"
388        );
389        assert_eq!(
390            frame.destination().value(),
391            expected_dest,
392            "Destination does not match"
393        );
394        assert_eq!(
395            frame.subscription().value(),
396            expected_sub,
397            "Subscription does not match"
398        );
399        assert_eq!(
400            frame.content_type().as_ref().map(|value| value.value()),
401            expected_content_type,
402            "content-type does not match"
403        );
404
405        assert_eq!(
406            frame.content_length().as_ref().map(|value| value.value()),
407            expected_content_length.as_ref(),
408            "content-length does not match"
409        );
410        expected_custom.iter().for_each(|(name, value)| {
411            assert!(
412                frame
413                    .custom
414                    .iter()
415                    .any(|custom_value| custom_value.header_name() == *name
416                        && custom_value.value() == value),
417                "Missing custom value {}:{}",
418                name,
419                value
420            );
421        });
422
423        assert_eq!(frame.body(), expected_body, "Body does not match");
424    }
425
426    #[test]
427    fn writes_binary_message_frame() {
428        let body = vec![0, 1, 1, 2, 3, 5, 8, 13];
429
430        let frame = MessageFrameBuilder::new(
431            "msg-1".to_owned(),
432            "path/to/hell".to_owned(),
433            "annual".to_owned(),
434        )
435        .content_type("foo/bar".to_owned())
436        .body(body)
437        .build();
438
439        assert_message_frame_roundtrip(
440            frame,
441            "msg-1",
442            "path/to/hell",
443            "annual",
444            Some("foo/bar"),
445            None,
446            &vec![],
447            Some(&[0, 1, 1, 2, 3, 5, 8, 13]),
448        );
449    }
450
451    #[test]
452    fn parses_send_frame() {
453        let message = b"SEND\n\
454            destination:stairway/to/heaven\n\
455            \n\
456            Lorem ipsum dolor sit amet,...\x00"
457            .to_vec();
458
459        if let Ok(ClientFrame::Send(frame)) = ClientFrame::try_from(message) {
460            assert_eq!(
461                "Lorem ipsum dolor sit amet,...",
462                std::str::from_utf8(frame.body().unwrap()).unwrap()
463            );
464        } else {
465            panic!("Send Frame not parsed correctly");
466        }
467    }
468
469    fn assert_in_range(ptr: *const u8, len: usize, actual: *const u8) {
470        let offset = unsafe { actual.offset_from(ptr) };
471
472        if offset < 0 || offset > (len as isize) {
473            panic!("offset {} not in range of {}", offset, len);
474        }
475    }
476
477    #[test]
478    fn does_not_copy() {
479        let message = b"SEND\n\
480            destination:stairway/to/heaven\n\
481            funky:doodle\n\
482            \n\
483            Lorem ipsum dolor sit amet,...\x00"
484            .to_vec();
485
486        let source_ptr = message.as_ptr();
487        let source_len = message.len();
488
489        let Ok(ClientFrame::Send(frame)) = ClientFrame::try_from(message) else {
490            panic!("Send Frame not parsed correctly")
491        };
492
493        assert_in_range(source_ptr, source_len, frame.body().unwrap().as_ptr());
494        assert_in_range(source_ptr, source_len, frame.destination().value().as_ptr());
495        assert_in_range(source_ptr, source_len, frame.custom[0].value().as_ptr());
496        assert_in_range(
497            source_ptr,
498            source_len,
499            frame.custom[0].header_name().as_ptr(),
500        );
501    }
502
503    #[test]
504    fn works_after_move() {
505        let message = b"SEND\n\
506            destination:stairway/to/heaven\n\
507            \n\
508            Lorem ipsum dolor sit amet,...\x00"
509            .to_vec();
510
511        let src_ptr = message.as_ptr() as u64;
512        let len = message.len();
513        let parsed = ClientFrame::try_from(message);
514
515        let handle = thread::spawn(move || {
516            let Ok(ClientFrame::Send(frame)) = parsed else {
517                panic!("Send Frame not parsed correctly")
518            };
519
520            assert_eq!(
521                "Lorem ipsum dolor sit amet,...",
522                std::str::from_utf8(frame.body().unwrap()).unwrap()
523            );
524
525            assert_eq!("stairway/to/heaven", frame.destination().value());
526            return frame.body().unwrap().as_ptr() as u64;
527        });
528
529        let Ok(address) = handle.join() else {
530            panic!("Error after move")
531        };
532
533        println!(
534            "Source: {}, Len: {}, Offset: {} ",
535            src_ptr,
536            len,
537            address - src_ptr,
538        );
539    }
540
541    #[test]
542    fn parses_binary_send_frame() {
543        let message = b"SEND\n\
544            destination:stairway/to/heaven\n\
545            \n\
546            \x00\x01\x01\x02\x03\x05\x08\x0d\
547            \x00"
548            .to_vec();
549
550        let Ok(ClientFrame::Send(frame)) = ClientFrame::try_from(message) else {
551            panic!("Send Frame not parsed correctly")
552        };
553
554        assert_eq!(&[0u8, 1, 1, 2, 3, 5, 8, 13], frame.body().unwrap());
555    }
556}