1#[macro_use]
2mod sender;
3#[macro_use]
4mod macros;
5
6mod utils;
7
8#[allow(non_snake_case)]
9#[allow(unused_parens)]
10#[allow(clippy::new_without_default)]
11pub mod client {
12 use crate::model::headers::*;
16
17 frames! {
18 Client,
19 (
20 Abort,
21 "Aborts a transaction that has begun but not yet been committed.",
22 ABORT,
23 Client,
24 transaction: Transaction
25 ),
26 (
27 Ack,
28 "Acknowledges a received message.",
29 ACK,
30 Client,
31 id: Id,
32 transaction: Transaction,
33 (receipt: Receipt)
34 ),
35 (
36 Begin,
37 "Begins a transaction.",
38 BEGIN,
39 Client,
40 transaction: Transaction,
41 (receipt: Receipt)
42 ),
43 (
44 Commit,
45 "Commits a transaction.",
46 COMMIT,
47 Client,
48 transaction: Transaction,
49 (receipt: Receipt)
50 ),
51 (
52 Connect,
53 "Initiates a STOMP session.",
54 CONNECT|STOMP,
55 Client,
56 host: Host,
57 accept_version: AcceptVersion,
58 (
59 heartbeat: HeartBeat: (||HeartBeatValue::new(HeartBeatIntervals::new(0,0))):"(0,0)",
60 login: Login,
61 passcode: Passcode
62 ),
63 [custom: cus],
64 "See [CONNECT Frame](https://stomp.github.io/stomp-specification-1.2.html#CONNECT_or_STOMP_Frame)."
65 ),
66 (
67 Disconnect,
68 "Ends a STOMP session.",
69 DISCONNECT,
70 Client,
71 receipt: Receipt
72 ),
73 (
74 Nack,
75 "Indicates that the client did not, or could not, process a message.",
76 NACK,
77 Client,
78 id: Id,
79 transaction: Transaction,
80 (receipt: Receipt)
81 ),
82 (
83 Send,
84 "Sends a message to a specific destination.",
85 SEND,
86 Client,
87 destination: Destination,
88 (
89 content_type: ContentType,
90 content_length: ContentLength,
91 transaction: Transaction,
92 receipt: Receipt
93 ),
94 [custom: cus],
95 [body: body]
96 ),
97 (
98 Subscribe,
99 "Subscribes to a specific destination.",
100 SUBSCRIBE,
101 Client,
102 destination: Destination,
103 id: Id,
104 (
105 ack_type: Ack: (||AckValue::new(AckType::Auto)):"Auto",
106 receipt: Receipt
107 ),
108 [custom: cus]
109 ),
110 (
111 Unsubscribe,
112 "Cancels a specific subscription.",
113 UNSUBSCRIBE,
114 Client,
115 id: Id,
116 (receipt: Receipt)
117 )
118 }
119
120 impl<'a> SendFrame<'a> {}
121}
122
123#[allow(non_snake_case)]
124#[allow(unused_parens)]
125#[allow(clippy::new_without_default)]
126pub mod server {
127 use crate::model::headers::*;
130 frames! {
131 Server,
132 (
133 Connected,
134 CONNECTED,
135 Server,
136 version: Version,
137 (
138 heartbeat: HeartBeat,
139 session: Session, server: Server
140 )
141 ),
142 (
143 Receipt,
144 RECEIPT,
145 Server,
146 receipt_id: ReceiptId
147 ),
148 (
149 Error,
150 ERROR,
151 Server,
152 (message: Message),
153 [custom: cus],
154 [body: body]),
155 (
156 Message,
157 MESSAGE,
158 Server,
159 message_id: MessageId,
160 destination: Destination,
161 subscription: Subscription,
162 (
163 content_type: ContentType,
164 content_length: ContentLength
165 ),
166 [custom: cus],
167 [body: body]
168 )
169 }
170
171 impl<'a> ErrorFrame<'a> {
172 pub fn from_message(message: &str) -> Self {
173 ErrorFrameBuilder::new().message(message.to_owned()).build()
174 }
175 }
176}
177
178#[cfg(test)]
179#[macro_use]
180mod test {
181 use super::client::*;
182 use super::server::*;
183
184 use crate::model::headers::*;
185 use std::convert::TryFrom;
186 use std::convert::TryInto;
187 use std::str::FromStr;
188 use std::thread;
189
190 #[test]
191 fn new_builder_can_be_build() {
192 let frame = SendFrameBuilder::new("foo/bar".to_owned()).build();
193
194 assert_eq!("foo/bar", frame.destination().value());
195 }
196
197 #[test]
198 fn parses_stomp_frame() {
199 let result = ClientFrame::try_from(
200 "STOMP\nhost:foo\naccept-version:1.1\nheart-beat:10,20\n\n\u{00}"
201 .as_bytes()
202 .to_owned(),
203 );
204
205 if let Ok(ClientFrame::Connect(frame)) = result {
206 assert_eq!(StompVersion::V1_1, frame.accept_version().value().0[0])
207 } else {
208 panic!("Expected a connect frame")
209 }
210 }
211
212 #[test]
213 fn parses_connect_with_custom_headers() {
214 let result = ClientFrame::try_from(
215 "CONNECT\nhost:foo\naccept-version:1.1\nheart-beat:10,20\nfoo:bar\n\n\u{00}"
216 .as_bytes()
217 .to_owned(),
218 );
219
220 if let Ok(ClientFrame::Connect(frame)) = result {
221 assert_eq!(1, frame.custom.len());
222 assert_eq!("foo", frame.custom[0].header_name());
223 assert_eq!("bar", frame.custom[0].value().to_owned());
224 } else {
225 panic!("Expected a connect frame")
226 }
227 }
228
229 #[test]
230 fn writes_connect_with_custom_headers() {
231 let frame = ConnectFrameBuilder::new(
232 "foo".to_owned(),
233 StompVersions::from_str("1.1,1.2").unwrap(),
234 )
235 .add_custom_header("foo".to_owned(), "bar".to_owned())
236 .build();
237
238 let displayed = String::from_utf8(frame.into()).unwrap();
239
240 assert_eq!(
241 "CONNECT\nhost:foo\naccept-version:1.1,1.2\nheart-beat:0,0\nfoo:bar\n\n\x00",
242 displayed
243 );
244 }
245
246 #[test]
247 fn builds_connected_frame() {
248 let frame = ConnectedFrameBuilder::new(StompVersion::V1_1)
249 .heartbeat(HeartBeatIntervals {
250 supplied: 20,
251 expected: 10,
252 })
253 .build();
254
255 assert_eq!(StompVersion::V1_1, *(frame.version().value()));
256 assert_eq!(20, frame.heartbeat().unwrap().value().supplied);
257 assert_eq!(10, frame.heartbeat().unwrap().value().expected);
258 }
259
260 #[test]
261 fn writes_connected_frame() {
262 let frame = ConnectedFrameBuilder::new(StompVersion::V1_1)
263 .heartbeat(HeartBeatIntervals {
264 supplied: 20,
265 expected: 10,
266 })
267 .build();
268
269 let displayed: Vec<u8> = frame.into();
270
271 assert_eq!(
272 b"CONNECTED\nversion:1.1\nheart-beat:20,10\n\n\x00".to_vec(),
273 displayed
274 );
275 }
276
277 #[test]
278 fn builds_receipt_frame() {
279 let frame = ReceiptFrameBuilder::new("rcpt-1".to_owned()).build();
280
281 assert_eq!("rcpt-1", frame.receipt_id().value());
282 }
283
284 #[test]
285 fn writes_message_frame() {
286 let body = b"Lorem ipsum dolor sit amet,".to_vec();
287
288 let frame = MessageFrameBuilder::new(
289 "msg-1".to_owned(),
290 "path/to/hell".to_owned(),
291 "annual".to_owned(),
292 )
293 .content_type("foo/bar".to_owned())
294 .body(body)
295 .build();
296
297 assert_message_frame_roundtrip(
298 frame,
299 "msg-1",
300 "path/to/hell",
301 "annual",
302 Some("foo/bar"),
303 None,
304 &vec![],
305 Some(b"Lorem ipsum dolor sit amet,"),
306 );
307 }
308
309 #[test]
310 fn writes_custom_headers() {
311 let body = b"Lorem ipsum dolor sit amet,".to_vec();
312
313 let frame = MessageFrameBuilder::new(
314 "msg-1".to_owned(),
315 "path/to/hell".to_owned(),
316 "annual".to_owned(),
317 )
318 .content_type("foo/bar".to_owned())
319 .add_custom_header("hello".to_owned(), "world".to_owned())
320 .body(body)
321 .build();
322
323 assert_message_frame_roundtrip(
324 frame,
325 "msg-1",
326 "path/to/hell",
327 "annual",
328 Some("foo/bar"),
329 None,
330 &vec![("hello", "world")],
331 Some(b"Lorem ipsum dolor sit amet,"),
332 );
333 }
334
335 fn assert_message_frame_roundtrip(
336 frame: MessageFrame,
337 expected_id: &str,
338 expected_dest: &str,
339 expected_sub: &str,
340 expected_content_type: Option<&str>,
341 expected_content_length: Option<u32>,
342 expected_custom: &Vec<(&str, &str)>,
343 expected_body: Option<&[u8]>,
344 ) {
345 assert_message_frame(
346 &frame,
347 expected_id,
348 expected_dest,
349 expected_sub,
350 expected_content_type,
351 expected_content_length,
352 expected_custom,
353 expected_body,
354 );
355
356 let bytes: Vec<u8> = frame.try_into().expect("Error writing bytes");
357
358 if let Ok(ServerFrame::Message(frame)) = ServerFrame::try_from(bytes) {
359 assert_message_frame(
360 &frame,
361 expected_id,
362 expected_dest,
363 expected_sub,
364 expected_content_type,
365 expected_content_length,
366 expected_custom,
367 expected_body,
368 );
369 } else {
370 panic!("Should have received a Message frame")
371 }
372 }
373
374 fn assert_message_frame(
375 frame: &MessageFrame,
376 expected_id: &str,
377 expected_dest: &str,
378 expected_sub: &str,
379 expected_content_type: Option<&str>,
380 expected_content_length: Option<u32>,
381 expected_custom: &Vec<(&str, &str)>,
382 expected_body: Option<&[u8]>,
383 ) {
384 assert_eq!(
385 frame.message_id().value(),
386 expected_id,
387 "MessageId does not match"
388 );
389 assert_eq!(
390 frame.destination().value(),
391 expected_dest,
392 "Destination does not match"
393 );
394 assert_eq!(
395 frame.subscription().value(),
396 expected_sub,
397 "Subscription does not match"
398 );
399 assert_eq!(
400 frame.content_type().as_ref().map(|value| value.value()),
401 expected_content_type,
402 "content-type does not match"
403 );
404
405 assert_eq!(
406 frame.content_length().as_ref().map(|value| value.value()),
407 expected_content_length.as_ref(),
408 "content-length does not match"
409 );
410 expected_custom.iter().for_each(|(name, value)| {
411 assert!(
412 frame
413 .custom
414 .iter()
415 .any(|custom_value| custom_value.header_name() == *name
416 && custom_value.value() == value),
417 "Missing custom value {}:{}",
418 name,
419 value
420 );
421 });
422
423 assert_eq!(frame.body(), expected_body, "Body does not match");
424 }
425
426 #[test]
427 fn writes_binary_message_frame() {
428 let body = vec![0, 1, 1, 2, 3, 5, 8, 13];
429
430 let frame = MessageFrameBuilder::new(
431 "msg-1".to_owned(),
432 "path/to/hell".to_owned(),
433 "annual".to_owned(),
434 )
435 .content_type("foo/bar".to_owned())
436 .body(body)
437 .build();
438
439 assert_message_frame_roundtrip(
440 frame,
441 "msg-1",
442 "path/to/hell",
443 "annual",
444 Some("foo/bar"),
445 None,
446 &vec![],
447 Some(&[0, 1, 1, 2, 3, 5, 8, 13]),
448 );
449 }
450
451 #[test]
452 fn parses_send_frame() {
453 let message = b"SEND\n\
454 destination:stairway/to/heaven\n\
455 \n\
456 Lorem ipsum dolor sit amet,...\x00"
457 .to_vec();
458
459 if let Ok(ClientFrame::Send(frame)) = ClientFrame::try_from(message) {
460 assert_eq!(
461 "Lorem ipsum dolor sit amet,...",
462 std::str::from_utf8(frame.body().unwrap()).unwrap()
463 );
464 } else {
465 panic!("Send Frame not parsed correctly");
466 }
467 }
468
469 fn assert_in_range(ptr: *const u8, len: usize, actual: *const u8) {
470 let offset = unsafe { actual.offset_from(ptr) };
471
472 if offset < 0 || offset > (len as isize) {
473 panic!("offset {} not in range of {}", offset, len);
474 }
475 }
476
477 #[test]
478 fn does_not_copy() {
479 let message = b"SEND\n\
480 destination:stairway/to/heaven\n\
481 funky:doodle\n\
482 \n\
483 Lorem ipsum dolor sit amet,...\x00"
484 .to_vec();
485
486 let source_ptr = message.as_ptr();
487 let source_len = message.len();
488
489 let Ok(ClientFrame::Send(frame)) = ClientFrame::try_from(message) else {
490 panic!("Send Frame not parsed correctly")
491 };
492
493 assert_in_range(source_ptr, source_len, frame.body().unwrap().as_ptr());
494 assert_in_range(source_ptr, source_len, frame.destination().value().as_ptr());
495 assert_in_range(source_ptr, source_len, frame.custom[0].value().as_ptr());
496 assert_in_range(
497 source_ptr,
498 source_len,
499 frame.custom[0].header_name().as_ptr(),
500 );
501 }
502
503 #[test]
504 fn works_after_move() {
505 let message = b"SEND\n\
506 destination:stairway/to/heaven\n\
507 \n\
508 Lorem ipsum dolor sit amet,...\x00"
509 .to_vec();
510
511 let src_ptr = message.as_ptr() as u64;
512 let len = message.len();
513 let parsed = ClientFrame::try_from(message);
514
515 let handle = thread::spawn(move || {
516 let Ok(ClientFrame::Send(frame)) = parsed else {
517 panic!("Send Frame not parsed correctly")
518 };
519
520 assert_eq!(
521 "Lorem ipsum dolor sit amet,...",
522 std::str::from_utf8(frame.body().unwrap()).unwrap()
523 );
524
525 assert_eq!("stairway/to/heaven", frame.destination().value());
526 return frame.body().unwrap().as_ptr() as u64;
527 });
528
529 let Ok(address) = handle.join() else {
530 panic!("Error after move")
531 };
532
533 println!(
534 "Source: {}, Len: {}, Offset: {} ",
535 src_ptr,
536 len,
537 address - src_ptr,
538 );
539 }
540
541 #[test]
542 fn parses_binary_send_frame() {
543 let message = b"SEND\n\
544 destination:stairway/to/heaven\n\
545 \n\
546 \x00\x01\x01\x02\x03\x05\x08\x0d\
547 \x00"
548 .to_vec();
549
550 let Ok(ClientFrame::Send(frame)) = ClientFrame::try_from(message) else {
551 panic!("Send Frame not parsed correctly")
552 };
553
554 assert_eq!(&[0u8, 1, 1, 2, 3, 5, 8, 13], frame.body().unwrap());
555 }
556}