1use std::collections::HashMap;
5use std::fmt::Display;
6
7use tracing::debug;
8
9use super::encoder::Name;
10use crate::api::{
11 Content, MessageType, ProtoMessage, ProtoName, ProtoPublish, ProtoPublishType,
12 ProtoSessionType, ProtoSubscribe, ProtoSubscribeType, ProtoUnsubscribe, ProtoUnsubscribeType,
13 SessionHeader, SlimHeader, proto::pubsub::v1::SessionMessageType,
14};
15
16use thiserror::Error;
17use tracing::error;
18
19#[derive(Error, Debug, PartialEq)]
20pub enum MessageError {
21 #[error("SLIM header not found")]
22 SlimHeaderNotFound,
23 #[error("source not found")]
24 SourceNotFound,
25 #[error("destination not found")]
26 DestinationNotFound,
27 #[error("session header not found")]
28 SessionHeaderNotFound,
29 #[error("message type not found")]
30 MessageTypeNotFound,
31 #[error("incoming connection not found")]
32 IncomingConnectionNotFound,
33}
34
35pub const SLIM_IDENTITY: &str = "SLIM_IDENTITY";
37
38impl From<&Name> for ProtoName {
40 fn from(name: &Name) -> Self {
41 Self {
42 component_0: name.components()[0],
43 component_1: name.components()[1],
44 component_2: name.components()[2],
45 component_3: name.components()[3],
46 }
47 }
48}
49
50impl Display for MessageType {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 match self {
54 MessageType::Publish(_) => write!(f, "publish"),
55 MessageType::Subscribe(_) => write!(f, "subscribe"),
56 MessageType::Unsubscribe(_) => write!(f, "unsubscribe"),
57 }
58 }
59}
60
61#[derive(Debug, Clone)]
63pub struct SlimHeaderFlags {
64 pub fanout: u32,
65 pub recv_from: Option<u64>,
66 pub forward_to: Option<u64>,
67 pub incoming_conn: Option<u64>,
68 pub error: Option<bool>,
69}
70
71impl Default for SlimHeaderFlags {
72 fn default() -> Self {
73 Self {
74 fanout: 1,
75 recv_from: None,
76 forward_to: None,
77 incoming_conn: None,
78 error: None,
79 }
80 }
81}
82
83impl Display for SlimHeaderFlags {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 write!(
86 f,
87 "fanout: {}, recv_from: {:?}, forward_to: {:?}, incoming_conn: {:?}, error: {:?}",
88 self.fanout, self.recv_from, self.forward_to, self.incoming_conn, self.error
89 )
90 }
91}
92
93impl SlimHeaderFlags {
94 pub fn new(
95 fanout: u32,
96 recv_from: Option<u64>,
97 forward_to: Option<u64>,
98 incoming_conn: Option<u64>,
99 error: Option<bool>,
100 ) -> Self {
101 Self {
102 fanout,
103 recv_from,
104 forward_to,
105 incoming_conn,
106 error,
107 }
108 }
109
110 pub fn with_fanout(self, fanout: u32) -> Self {
111 Self { fanout, ..self }
112 }
113
114 pub fn with_recv_from(self, recv_from: u64) -> Self {
115 Self {
116 recv_from: Some(recv_from),
117 ..self
118 }
119 }
120
121 pub fn with_forward_to(self, forward_to: u64) -> Self {
122 Self {
123 forward_to: Some(forward_to),
124 ..self
125 }
126 }
127
128 pub fn with_incoming_conn(self, incoming_conn: u64) -> Self {
129 Self {
130 incoming_conn: Some(incoming_conn),
131 ..self
132 }
133 }
134
135 pub fn with_error(self, error: bool) -> Self {
136 Self {
137 error: Some(error),
138 ..self
139 }
140 }
141}
142
143impl SlimHeader {
147 pub fn new(source: &Name, destination: &Name, flags: Option<SlimHeaderFlags>) -> Self {
148 let flags = flags.unwrap_or_default();
149
150 Self {
151 source: Some(ProtoName::from(source)),
152 destination: Some(ProtoName::from(destination)),
153 fanout: flags.fanout,
154 recv_from: flags.recv_from,
155 forward_to: flags.forward_to,
156 incoming_conn: flags.incoming_conn,
157 error: flags.error,
158 }
159 }
160
161 pub fn clear(&mut self) {
162 self.recv_from = None;
163 self.forward_to = None;
164 }
165
166 pub fn get_recv_from(&self) -> Option<u64> {
167 self.recv_from
168 }
169
170 pub fn get_forward_to(&self) -> Option<u64> {
171 self.forward_to
172 }
173
174 pub fn get_incoming_conn(&self) -> Option<u64> {
175 self.incoming_conn
176 }
177
178 pub fn get_error(&self) -> Option<bool> {
179 self.error
180 }
181
182 pub fn get_source(&self) -> Name {
183 match &self.source {
184 Some(source) => Name::from(source),
185 None => panic!("source not found"),
186 }
187 }
188
189 pub fn get_dst(&self) -> Name {
190 match &self.destination {
191 Some(destination) => Name::from(destination),
192 None => panic!("destination not found"),
193 }
194 }
195
196 pub fn set_source(&mut self, source: &Name) {
197 self.source = Some(ProtoName::from(source));
198 }
199
200 pub fn set_destination(&mut self, dst: &Name) {
201 self.destination = Some(ProtoName::from(dst));
202 }
203
204 pub fn get_fanout(&self) -> u32 {
205 self.fanout
206 }
207
208 pub fn set_recv_from(&mut self, recv_from: Option<u64>) {
209 self.recv_from = recv_from;
210 }
211
212 pub fn set_forward_to(&mut self, forward_to: Option<u64>) {
213 self.forward_to = forward_to;
214 }
215
216 pub fn set_error(&mut self, error: Option<bool>) {
217 self.error = error;
218 }
219
220 pub fn set_incoming_conn(&mut self, incoming_conn: Option<u64>) {
221 self.incoming_conn = incoming_conn;
222 }
223
224 pub fn set_error_flag(&mut self, error: Option<bool>) {
225 self.error = error;
226 }
227
228 pub fn set_fanout(&mut self, fanout: u32) {
229 self.fanout = fanout;
230 }
231
232 pub fn get_in_out_connections(&self) -> (u64, Option<u64>) {
236 let incoming = self
238 .get_incoming_conn()
239 .expect("incoming connection not found");
240
241 if let Some(val) = self.get_recv_from() {
242 debug!(
243 "received recv_from command, update state on connection {}",
244 val
245 );
246 return (val, None);
247 }
248
249 if let Some(val) = self.get_forward_to() {
250 debug!(
251 "received forward_to command, update state and forward to connection {}",
252 val
253 );
254 return (incoming, Some(val));
255 }
256
257 (incoming, None)
259 }
260}
261
262impl SessionHeader {
266 pub fn new(
267 session_type: i32,
268 session_message_type: i32,
269 session_id: u32,
270 message_id: u32,
271 ) -> Self {
272 Self {
273 session_type,
274 session_message_type,
275 session_id,
276 message_id,
277 }
278 }
279
280 pub fn get_session_id(&self) -> u32 {
281 self.session_id
282 }
283
284 pub fn get_message_id(&self) -> u32 {
285 self.message_id
286 }
287
288 pub fn set_session_id(&mut self, session_id: u32) {
289 self.session_id = session_id;
290 }
291
292 pub fn set_message_id(&mut self, message_id: u32) {
293 self.message_id = message_id;
294 }
295
296 pub fn clear(&mut self) {
297 self.session_id = 0;
298 self.message_id = 0;
299 }
300}
301
302impl ProtoSubscribe {
305 pub fn new(source: &Name, dst: &Name, flags: Option<SlimHeaderFlags>) -> Self {
306 let header = Some(SlimHeader::new(source, dst, flags));
307
308 ProtoSubscribe {
309 header,
310 component_0: dst.components_strings().unwrap()[0].clone(),
311 component_1: dst.components_strings().unwrap()[1].clone(),
312 component_2: dst.components_strings().unwrap()[2].clone(),
313 }
314 }
315}
316
317impl From<ProtoMessage> for ProtoSubscribe {
319 fn from(message: ProtoMessage) -> Self {
320 match message.message_type {
321 Some(ProtoSubscribeType(s)) => s,
322 _ => panic!("message type is not subscribe"),
323 }
324 }
325}
326
327impl ProtoUnsubscribe {
330 pub fn with_header(header: Option<SlimHeader>) -> Self {
331 ProtoUnsubscribe { header }
332 }
333
334 pub fn new(source: &Name, dst: &Name, flags: Option<SlimHeaderFlags>) -> Self {
335 let header = Some(SlimHeader::new(source, dst, flags));
336
337 Self::with_header(header)
338 }
339}
340
341impl From<ProtoMessage> for ProtoUnsubscribe {
343 fn from(message: ProtoMessage) -> Self {
344 match message.message_type {
345 Some(ProtoUnsubscribeType(u)) => u,
346 _ => panic!("message type is not unsubscribe"),
347 }
348 }
349}
350
351impl ProtoPublish {
354 pub fn with_header(
355 header: Option<SlimHeader>,
356 session: Option<SessionHeader>,
357 payload: Option<Content>,
358 ) -> Self {
359 ProtoPublish {
360 header,
361 session,
362 msg: payload,
363 }
364 }
365
366 pub fn new(
367 source: &Name,
368 dst: &Name,
369 flags: Option<SlimHeaderFlags>,
370 content_type: &str,
371 blob: Vec<u8>,
372 ) -> Self {
373 let slim_header = Some(SlimHeader::new(source, dst, flags));
374
375 let session_header = Some(SessionHeader::default());
376
377 let msg = Some(Content {
378 content_type: content_type.to_string(),
379 blob,
380 });
381
382 Self::with_header(slim_header, session_header, msg)
383 }
384
385 pub fn get_slim_header(&self) -> &SlimHeader {
386 self.header.as_ref().unwrap()
387 }
388
389 pub fn get_session_header(&self) -> &SessionHeader {
390 self.session.as_ref().unwrap()
391 }
392
393 pub fn get_slim_header_as_mut(&mut self) -> &mut SlimHeader {
394 self.header.as_mut().unwrap()
395 }
396
397 pub fn get_session_header_as_mut(&mut self) -> &mut SessionHeader {
398 self.session.as_mut().unwrap()
399 }
400
401 pub fn get_payload(&self) -> &Content {
402 self.msg.as_ref().unwrap()
403 }
404}
405
406impl From<ProtoMessage> for ProtoPublish {
408 fn from(message: ProtoMessage) -> Self {
409 match message.message_type {
410 Some(ProtoPublishType(p)) => p,
411 _ => panic!("message type is not publish"),
412 }
413 }
414}
415
416impl ProtoMessage {
419 fn new(metadata: HashMap<String, String>, message_type: MessageType) -> Self {
420 ProtoMessage {
421 metadata,
422 message_type: Some(message_type),
423 }
424 }
425
426 pub fn new_subscribe(source: &Name, dst: &Name, flags: Option<SlimHeaderFlags>) -> Self {
427 let subscribe = ProtoSubscribe::new(source, dst, flags);
428
429 Self::new(HashMap::new(), ProtoSubscribeType(subscribe))
430 }
431
432 pub fn new_unsubscribe(source: &Name, dst: &Name, flags: Option<SlimHeaderFlags>) -> Self {
433 let unsubscribe = ProtoUnsubscribe::new(source, dst, flags);
434
435 Self::new(HashMap::new(), ProtoUnsubscribeType(unsubscribe))
436 }
437
438 pub fn new_publish(
439 source: &Name,
440 dst: &Name,
441 flags: Option<SlimHeaderFlags>,
442 content_type: &str,
443 blob: Vec<u8>,
444 ) -> Self {
445 let publish = ProtoPublish::new(source, dst, flags, content_type, blob);
446
447 Self::new(HashMap::new(), ProtoPublishType(publish))
448 }
449
450 pub fn new_publish_with_headers(
451 slim_header: Option<SlimHeader>,
452 session_header: Option<SessionHeader>,
453 content_type: &str,
454 blob: Vec<u8>,
455 ) -> Self {
456 let publish = ProtoPublish::with_header(
457 slim_header,
458 session_header,
459 Some(Content {
460 content_type: content_type.to_string(),
461 blob,
462 }),
463 );
464
465 Self::new(HashMap::new(), ProtoPublishType(publish))
466 }
467
468 pub fn validate(&self) -> Result<(), MessageError> {
470 if self.message_type.is_none() {
472 return Err(MessageError::MessageTypeNotFound);
473 }
474
475 if self.try_get_slim_header().is_none() {
477 return Err(MessageError::SlimHeaderNotFound);
478 }
479
480 let slim_header = self.get_slim_header();
482
483 if slim_header.source.is_none() {
485 return Err(MessageError::SourceNotFound);
486 }
487 if slim_header.destination.is_none() {
488 return Err(MessageError::DestinationNotFound);
489 }
490
491 match &self.message_type {
492 Some(ProtoPublishType(p)) => {
493 if p.header.is_none() {
495 return Err(MessageError::SlimHeaderNotFound);
496 }
497
498 if p.session.is_none() {
500 return Err(MessageError::SessionHeaderNotFound);
501 }
502 }
503 Some(ProtoSubscribeType(s)) => {
504 if s.header.is_none() {
505 return Err(MessageError::SlimHeaderNotFound);
506 }
507 }
508 Some(ProtoUnsubscribeType(u)) => {
509 if u.header.is_none() {
510 return Err(MessageError::SlimHeaderNotFound);
511 }
512 }
513 None => return Err(MessageError::MessageTypeNotFound),
514 }
515
516 Ok(())
517 }
518
519 pub fn insert_metadata(&mut self, key: String, val: String) {
522 self.metadata.insert(key, val);
523 }
524
525 pub fn remove_metadata(&mut self, key: &str) {
527 self.metadata.remove(key);
528 }
529
530 pub fn contains_metadata(&self, key: &str) -> bool {
531 self.metadata.contains_key(key)
532 }
533
534 pub fn get_metadata(&self, key: &str) -> Option<&String> {
535 self.metadata.get(key)
536 }
537
538 pub fn get_slim_header(&self) -> &SlimHeader {
539 match &self.message_type {
540 Some(ProtoPublishType(publish)) => publish.header.as_ref().unwrap(),
541 Some(ProtoSubscribeType(sub)) => sub.header.as_ref().unwrap(),
542 Some(ProtoUnsubscribeType(unsub)) => unsub.header.as_ref().unwrap(),
543 None => panic!("SLIM header not found"),
544 }
545 }
546
547 pub fn get_slim_header_mut(&mut self) -> &mut SlimHeader {
548 match &mut self.message_type {
549 Some(ProtoPublishType(publish)) => publish.header.as_mut().unwrap(),
550 Some(ProtoSubscribeType(sub)) => sub.header.as_mut().unwrap(),
551 Some(ProtoUnsubscribeType(unsub)) => unsub.header.as_mut().unwrap(),
552 None => panic!("SLIM header not found"),
553 }
554 }
555
556 pub fn try_get_slim_header(&self) -> Option<&SlimHeader> {
557 match &self.message_type {
558 Some(ProtoPublishType(publish)) => publish.header.as_ref(),
559 Some(ProtoSubscribeType(sub)) => sub.header.as_ref(),
560 Some(ProtoUnsubscribeType(unsub)) => unsub.header.as_ref(),
561 None => None,
562 }
563 }
564
565 pub fn get_session_header(&self) -> &SessionHeader {
566 match &self.message_type {
567 Some(ProtoPublishType(publish)) => publish.session.as_ref().unwrap(),
568 Some(ProtoSubscribeType(_)) => panic!("session header not found"),
569 Some(ProtoUnsubscribeType(_)) => panic!("session header not found"),
570 None => panic!("session header not found"),
571 }
572 }
573
574 pub fn get_session_header_mut(&mut self) -> &mut SessionHeader {
575 match &mut self.message_type {
576 Some(ProtoPublishType(publish)) => publish.session.as_mut().unwrap(),
577 Some(ProtoSubscribeType(_)) => panic!("session header not found"),
578 Some(ProtoUnsubscribeType(_)) => panic!("session header not found"),
579 None => panic!("session header not found"),
580 }
581 }
582
583 pub fn try_get_session_header(&self) -> Option<&SessionHeader> {
584 match &self.message_type {
585 Some(ProtoPublishType(publish)) => publish.session.as_ref(),
586 Some(ProtoSubscribeType(_)) => None,
587 Some(ProtoUnsubscribeType(_)) => None,
588 None => None,
589 }
590 }
591
592 pub fn try_get_session_header_mut(&mut self) -> Option<&mut SessionHeader> {
593 match &mut self.message_type {
594 Some(ProtoPublishType(publish)) => publish.session.as_mut(),
595 Some(ProtoSubscribeType(_)) => None,
596 Some(ProtoUnsubscribeType(_)) => None,
597 None => None,
598 }
599 }
600
601 pub fn get_id(&self) -> u32 {
602 self.get_session_header().get_message_id()
603 }
604
605 pub fn get_source(&self) -> Name {
606 self.get_slim_header().get_source()
607 }
608
609 pub fn get_fanout(&self) -> u32 {
610 self.get_slim_header().get_fanout()
611 }
612
613 pub fn get_recv_from(&self) -> Option<u64> {
614 self.get_slim_header().get_recv_from()
615 }
616
617 pub fn get_forward_to(&self) -> Option<u64> {
618 self.get_slim_header().get_forward_to()
619 }
620
621 pub fn get_error(&self) -> Option<bool> {
622 self.get_slim_header().get_error()
623 }
624
625 pub fn get_incoming_conn(&self) -> u64 {
626 self.get_slim_header().get_incoming_conn().unwrap()
627 }
628
629 pub fn try_get_incoming_conn(&self) -> Option<u64> {
630 self.get_slim_header().get_incoming_conn()
631 }
632
633 pub fn get_dst(&self) -> Name {
634 let dst = self.get_slim_header().get_dst();
635
636 if let Some(ProtoSubscribeType(subscribe)) = &self.message_type {
638 return Name::from_strings([
639 subscribe.component_0.clone(),
640 subscribe.component_1.clone(),
641 subscribe.component_2.clone(),
642 ])
643 .with_id(dst.id());
644 }
645
646 dst
647 }
648
649 pub fn get_type(&self) -> &MessageType {
650 match &self.message_type {
651 Some(t) => t,
652 None => panic!("message type not found"),
653 }
654 }
655
656 pub fn get_payload(&self) -> Option<&Content> {
657 match &self.message_type {
658 Some(ProtoPublishType(p)) => p.msg.as_ref(),
659 Some(ProtoSubscribeType(_)) => panic!("payload not found"),
660 Some(ProtoUnsubscribeType(_)) => panic!("payload not found"),
661 None => panic!("payload not found"),
662 }
663 }
664
665 pub fn get_session_message_type(&self) -> SessionMessageType {
666 self.get_session_header()
667 .session_message_type
668 .try_into()
669 .unwrap_or_default()
670 }
671
672 pub fn clear_slim_header(&mut self) {
673 self.get_slim_header_mut().clear();
674 }
675
676 pub fn set_recv_from(&mut self, recv_from: Option<u64>) {
677 self.get_slim_header_mut().set_recv_from(recv_from);
678 }
679
680 pub fn set_forward_to(&mut self, forward_to: Option<u64>) {
681 self.get_slim_header_mut().set_forward_to(forward_to);
682 }
683
684 pub fn set_error(&mut self, error: Option<bool>) {
685 self.get_slim_header_mut().set_error(error);
686 }
687
688 pub fn set_fanout(&mut self, fanout: u32) {
689 self.get_slim_header_mut().set_fanout(fanout);
690 }
691
692 pub fn set_incoming_conn(&mut self, incoming_conn: Option<u64>) {
693 self.get_slim_header_mut().set_incoming_conn(incoming_conn);
694 }
695
696 pub fn set_error_flag(&mut self, error: Option<bool>) {
697 self.get_slim_header_mut().set_error_flag(error);
698 }
699
700 pub fn set_session_message_type(&mut self, message_type: SessionMessageType) {
701 self.get_session_header_mut()
702 .set_session_message_type(message_type);
703 }
704
705 pub fn set_session_type(&mut self, session_type: ProtoSessionType) {
706 self.get_session_header_mut().set_session_type(session_type);
707 }
708
709 pub fn get_session_type(&self) -> ProtoSessionType {
710 self.get_session_header().session_type()
711 }
712
713 pub fn set_message_id(&mut self, message_id: u32) {
714 self.get_session_header_mut().set_message_id(message_id);
715 }
716
717 pub fn is_publish(&self) -> bool {
718 matches!(self.get_type(), MessageType::Publish(_))
719 }
720
721 pub fn is_subscribe(&self) -> bool {
722 matches!(self.get_type(), MessageType::Subscribe(_))
723 }
724
725 pub fn is_unsubscribe(&self) -> bool {
726 matches!(self.get_type(), MessageType::Unsubscribe(_))
727 }
728}
729
730impl AsRef<ProtoPublish> for ProtoMessage {
731 fn as_ref(&self) -> &ProtoPublish {
732 match &self.message_type {
733 Some(ProtoPublishType(p)) => p,
734 _ => panic!("message type is not publish"),
735 }
736 }
737}
738
739#[cfg(test)]
740mod tests {
741 use crate::{api::proto::pubsub::v1::SessionMessageType, messages::encoder::Name};
742
743 use super::*;
744
745 fn test_subscription_template(
746 subscription: bool,
747 source: Name,
748 dst: Name,
749 flags: Option<SlimHeaderFlags>,
750 ) {
751 let sub = {
752 if subscription {
753 ProtoMessage::new_subscribe(&source, &dst, flags.clone())
754 } else {
755 ProtoMessage::new_unsubscribe(&source, &dst, flags.clone())
756 }
757 };
758
759 let flags = if flags.is_none() {
760 Some(SlimHeaderFlags::default())
761 } else {
762 flags
763 };
764
765 assert!(!sub.is_publish());
766 assert_eq!(sub.is_subscribe(), subscription);
767 assert_eq!(sub.is_unsubscribe(), !subscription);
768 assert_eq!(flags.as_ref().unwrap().recv_from, sub.get_recv_from());
769 assert_eq!(flags.as_ref().unwrap().forward_to, sub.get_forward_to());
770 assert_eq!(None, sub.try_get_incoming_conn());
771 assert_eq!(source, sub.get_source());
772 let got_name = sub.get_dst();
773 assert_eq!(dst, got_name);
774 }
775
776 fn test_publish_template(source: Name, dst: Name, flags: Option<SlimHeaderFlags>) {
777 let pub_msg = ProtoMessage::new_publish(
778 &source,
779 &dst,
780 flags.clone(),
781 "str",
782 "this is the content of the message".into(),
783 );
784
785 let flags = if flags.is_none() {
786 Some(SlimHeaderFlags::default())
787 } else {
788 flags
789 };
790
791 assert!(pub_msg.is_publish());
792 assert!(!pub_msg.is_subscribe());
793 assert!(!pub_msg.is_unsubscribe());
794 assert_eq!(flags.as_ref().unwrap().recv_from, pub_msg.get_recv_from());
795 assert_eq!(flags.as_ref().unwrap().forward_to, pub_msg.get_forward_to());
796 assert_eq!(None, pub_msg.try_get_incoming_conn());
797 assert_eq!(source, pub_msg.get_source());
798 let got_name = pub_msg.get_dst();
799 assert_eq!(dst, got_name);
800 assert_eq!(flags.as_ref().unwrap().fanout, pub_msg.get_fanout());
801 }
802
803 #[test]
804 fn test_subscription() {
805 let source = Name::from_strings(["org", "ns", "type"]).with_id(1);
806 let dst = Name::from_strings(["org", "ns", "type"]).with_id(2);
807
808 test_subscription_template(true, source.clone(), dst.clone(), None);
810
811 test_subscription_template(true, source.clone(), dst.clone(), None);
813
814 test_subscription_template(
816 true,
817 source.clone(),
818 dst.clone(),
819 Some(SlimHeaderFlags::default().with_recv_from(50)),
820 );
821
822 test_subscription_template(
824 true,
825 source.clone(),
826 dst.clone(),
827 Some(SlimHeaderFlags::default().with_forward_to(30)),
828 );
829 }
830
831 #[test]
832 fn test_unsubscription() {
833 let source = Name::from_strings(["org", "ns", "type"]).with_id(1);
834 let dst = Name::from_strings(["org", "ns", "type"]).with_id(2);
835
836 test_subscription_template(false, source.clone(), dst.clone(), None);
838
839 test_subscription_template(false, source.clone(), dst.clone(), None);
841
842 test_subscription_template(
844 false,
845 source.clone(),
846 dst.clone(),
847 Some(SlimHeaderFlags::default().with_recv_from(50)),
848 );
849
850 test_subscription_template(
852 false,
853 source.clone(),
854 dst.clone(),
855 Some(SlimHeaderFlags::default().with_forward_to(30)),
856 );
857 }
858
859 #[test]
860 fn test_publish() {
861 let source = Name::from_strings(["org", "ns", "type"]).with_id(1);
862 let mut dst = Name::from_strings(["org", "ns", "type"]);
863
864 test_publish_template(
866 source.clone(),
867 dst.clone(),
868 Some(SlimHeaderFlags::default()),
869 );
870
871 dst.set_id(2);
873 test_publish_template(
874 source.clone(),
875 dst.clone(),
876 Some(SlimHeaderFlags::default()),
877 );
878 dst.reset_id();
879
880 test_publish_template(
882 source.clone(),
883 dst.clone(),
884 Some(SlimHeaderFlags::default().with_recv_from(50)),
885 );
886
887 test_publish_template(
889 source.clone(),
890 dst.clone(),
891 Some(SlimHeaderFlags::default().with_forward_to(30)),
892 );
893
894 test_publish_template(
896 source.clone(),
897 dst.clone(),
898 Some(SlimHeaderFlags::default().with_fanout(2)),
899 );
900 }
901
902 #[test]
903 fn test_conversions() {
904 let name = Name::from_strings(["org", "ns", "type"]).with_id(1);
906 let proto_name = ProtoName::from(&name);
907
908 assert_eq!(proto_name.component_0, name.components()[0]);
909 assert_eq!(proto_name.component_1, name.components()[1]);
910 assert_eq!(proto_name.component_2, name.components()[2]);
911 assert_eq!(proto_name.component_3, name.components()[3]);
912
913 let name_from_proto = Name::from(&proto_name);
915 assert_eq!(name_from_proto.components()[0], proto_name.component_0);
916 assert_eq!(name_from_proto.components()[1], proto_name.component_1);
917 assert_eq!(name_from_proto.components()[2], proto_name.component_2);
918 assert_eq!(name_from_proto.components()[3], proto_name.component_3);
919
920 let dst = Name::from_strings(["org", "ns", "type"]).with_id(1);
922 let proto_subscribe = ProtoMessage::new_subscribe(
923 &name,
924 &dst,
925 Some(
926 SlimHeaderFlags::default()
927 .with_recv_from(2)
928 .with_forward_to(3),
929 ),
930 );
931 let proto_subscribe = ProtoSubscribe::from(proto_subscribe);
932 assert_eq!(proto_subscribe.header.as_ref().unwrap().get_source(), name);
933 assert_eq!(proto_subscribe.header.as_ref().unwrap().get_dst(), dst,);
934
935 let proto_unsubscribe = ProtoMessage::new_unsubscribe(
937 &name,
938 &dst,
939 Some(
940 SlimHeaderFlags::default()
941 .with_recv_from(2)
942 .with_forward_to(3),
943 ),
944 );
945 let proto_unsubscribe = ProtoUnsubscribe::from(proto_unsubscribe);
946 assert_eq!(
947 proto_unsubscribe.header.as_ref().unwrap().get_source(),
948 name
949 );
950 assert_eq!(proto_unsubscribe.header.as_ref().unwrap().get_dst(), dst);
951
952 let proto_publish = ProtoMessage::new_publish(
954 &name,
955 &dst,
956 Some(
957 SlimHeaderFlags::default()
958 .with_recv_from(2)
959 .with_forward_to(3),
960 ),
961 "str",
962 "this is the content of the message".into(),
963 );
964 let proto_publish = ProtoPublish::from(proto_publish);
965 assert_eq!(proto_publish.header.as_ref().unwrap().get_source(), name);
966 assert_eq!(proto_publish.header.as_ref().unwrap().get_dst(), dst);
967 }
968
969 #[test]
970 fn test_panic() {
971 let source = Name::from_strings(["org", "ns", "type"]).with_id(1);
972 let dst = Name::from_strings(["org", "ns", "type"]).with_id(2);
973
974 let msg = ProtoMessage::new_subscribe(
976 &source,
977 &dst,
978 Some(
979 SlimHeaderFlags::default()
980 .with_recv_from(2)
981 .with_forward_to(3),
982 ),
983 );
984
985 let result = std::panic::catch_unwind(|| ProtoUnsubscribe::from(msg.clone()));
988 assert!(result.is_err());
989
990 let result = std::panic::catch_unwind(|| ProtoPublish::from(msg.clone()));
993 assert!(result.is_err());
994
995 let result = std::panic::catch_unwind(|| ProtoSubscribe::from(msg));
997 assert!(result.is_ok());
998 }
999
1000 #[test]
1001 fn test_panic_header() {
1002 let header = SlimHeader {
1004 source: None,
1005 destination: None,
1006 fanout: 0,
1007 recv_from: None,
1008 forward_to: None,
1009 incoming_conn: None,
1010 error: None,
1011 };
1012
1013 let result = std::panic::catch_unwind(|| header.get_source());
1015 assert!(result.is_err());
1016
1017 let result = std::panic::catch_unwind(|| header.get_dst());
1018 assert!(result.is_err());
1019
1020 let result = std::panic::catch_unwind(|| header.get_recv_from());
1022 assert!(result.is_ok());
1023
1024 let result = std::panic::catch_unwind(|| header.get_forward_to());
1025 assert!(result.is_ok());
1026
1027 let result = std::panic::catch_unwind(|| header.get_incoming_conn());
1029 assert!(result.is_ok());
1030
1031 let result = std::panic::catch_unwind(|| header.get_error());
1033 assert!(result.is_ok());
1034 }
1035
1036 #[test]
1037 fn test_panic_session_header() {
1038 let header = SessionHeader::new(0, 0, 0, 0);
1040
1041 let result = std::panic::catch_unwind(|| header.get_session_id());
1043 assert!(result.is_ok());
1044
1045 let result = std::panic::catch_unwind(|| header.get_message_id());
1046 assert!(result.is_ok());
1047 }
1048
1049 #[test]
1050 fn test_panic_proto_message() {
1051 let message = ProtoMessage {
1053 metadata: HashMap::new(),
1054 message_type: None,
1055 };
1056
1057 let result = std::panic::catch_unwind(|| message.get_slim_header());
1059 assert!(result.is_err());
1060
1061 let result = std::panic::catch_unwind(|| message.get_type());
1063 assert!(result.is_err());
1064
1065 let result = std::panic::catch_unwind(|| message.get_source());
1067 assert!(result.is_err());
1068 let result = std::panic::catch_unwind(|| message.get_dst());
1069 assert!(result.is_err());
1070 let result = std::panic::catch_unwind(|| message.get_recv_from());
1071 assert!(result.is_err());
1072 let result = std::panic::catch_unwind(|| message.get_forward_to());
1073 assert!(result.is_err());
1074 let result = std::panic::catch_unwind(|| message.get_incoming_conn());
1075 assert!(result.is_err());
1076 let result = std::panic::catch_unwind(|| message.get_fanout());
1077 assert!(result.is_err());
1078 }
1079
1080 #[test]
1081 fn test_service_type_to_int() {
1082 let total_service_types = SessionMessageType::ChannelMlsAck as i32;
1084
1085 for i in 0..total_service_types {
1086 let service_type =
1088 SessionMessageType::try_from(i).expect("failed to convert int to service type");
1089 let service_type_int = i32::from(service_type);
1090 assert_eq!(service_type_int, i32::from(service_type),);
1091 }
1092
1093 let invalid_service_type = SessionMessageType::try_from(total_service_types + 1);
1095 assert!(invalid_service_type.is_err());
1096 }
1097}