1#![warn(missing_docs)]
2#[macro_use]
12extern crate serde_derive;
13extern crate serde_bytes;
14extern crate serde_cbor;
15
16mod frame;
17mod local_db;
18mod peer;
19mod server;
20
21use siphasher::sip::SipHasher;
22use std::hash::{Hash, Hasher};
23use std::io::Cursor;
24use std::io::Write;
25use std::io::{Error, Read};
26use std::net::TcpStream;
27
28use bytes::{Buf, BufMut, BytesMut};
29use std::net::{IpAddr, SocketAddr};
30
31pub(crate) const HDRL: usize = 8;
33pub(crate) const CIDL: usize = 4;
35pub(crate) const KEYL: usize = 8;
37pub(crate) const HDRKEYL: usize = HDRL + KEYL;
39
40pub(crate) const MSGMAXSIZE: usize = 0xffffff;
42
43const KEEPALIVE: u64 = 5;
44
45pub struct MsgHdr {
52 thlen: u32,
53 cid: u32,
54 key: u64,
55}
56
57impl MsgHdr {
58 pub fn new(len: u32, cid: u32, key: u64) -> MsgHdr {
71 MsgHdr {
72 thlen: hdr_set_len(len),
73 cid,
74 key,
75 }
76 }
77
78 pub fn get_type(&self) -> u8 {
93 hdr_get_type(self.thlen)
94 }
95
96 pub fn get_hdrkey_len() -> usize {
99 HDRKEYL
100 }
101
102 pub fn set_len(&mut self, len: u32) {
116 self.thlen = hdr_set_len(len);
117 }
118
119 pub fn get_len(&self) -> u32 {
134 hdr_get_len(self.thlen)
135 }
136
137 pub fn set_cid(&mut self, cid: u32) {
151 self.cid = cid;
152 }
153
154 pub fn get_cid(&self) -> u32 {
169 self.cid
170 }
171
172 pub fn set_key(&mut self, key: u64) {
186 self.key = key;
187 }
188
189 pub fn get_key(&self) -> u64 {
204 self.key
205 }
206
207 pub fn encode(&self) -> Vec<u8> {
222 let mut msgv = write_hdr(self.get_len() as usize, self.get_cid()).to_vec();
223 msgv.extend(write_key(self.get_key()).to_vec());
224 msgv
225 }
226
227 pub fn decode(buf: Vec<u8>) -> MsgHdr {
246 MsgHdr::new(
247 read_hdr_len(&buf) as u32,
248 read_cid_from_hdr(&buf),
249 read_key_from_hdr(&buf),
250 )
251 }
252 #[inline]
264 pub fn do_hash(t: &[String]) -> u64 {
265 let mut s = SipHasher::new();
266 for item in t {
267 item.hash(&mut s);
268 }
269 s.finish()
270 }
271
272 #[inline]
282 pub fn select_cid(key: u64) -> u32 {
283 key as u32
284 }
285
286 #[inline]
308 pub fn addr2str(addr: &SocketAddr) -> String {
309 let ipaddr = addr.ip();
310 match ipaddr {
311 IpAddr::V4(v4) => {
312 let v4oct = v4.octets();
313 let v4str = format!(
314 "{}.{}.{}.{}:{}",
315 v4oct[0],
316 v4oct[1],
317 v4oct[2],
318 v4oct[3],
319 addr.port()
320 );
321 v4str
322 }
323 IpAddr::V6(v6) => {
324 let v6seg = v6.segments();
325 let v6str = format!(
326 "[{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}]:{}",
327 v6seg[0],
328 v6seg[1],
329 v6seg[2],
330 v6seg[3],
331 v6seg[4],
332 v6seg[5],
333 v6seg[6],
334 v6seg[7],
335 addr.port()
336 );
337 v6str
338 }
339 }
340 }
341}
342
343fn hdr_set_len(len: u32) -> u32 {
344 77 << 24 | len & 0xffffff
345}
346
347fn hdr_get_len(thlen: u32) -> u32 {
348 thlen & 0xffffff
349}
350
351fn hdr_get_type(thlen: u32) -> u8 {
352 (thlen >> 24) as u8
353}
354
355#[derive(Serialize, Deserialize, Debug, Clone)]
361pub struct Msg {
362 uid: String,
363 channel: String,
364 #[serde(with = "serde_bytes")]
365 message: Vec<u8>,
366}
367
368impl Msg {
369 #[inline]
378 pub fn new(uid: String, channel: String, message: Vec<u8>) -> Msg {
379 Msg {
380 uid,
381 channel,
382 message,
383 }
384 }
385
386 #[inline]
398 pub fn set_uid(mut self, uid: String) -> Msg {
399 self.uid = uid;
400 self
401 }
402
403 #[inline]
415 pub fn set_channel(mut self, channel: String) -> Msg {
416 self.channel = channel;
417 self
418 }
419
420 #[inline]
431 pub fn set_message(mut self, message: Vec<u8>) -> Msg {
432 self.message = message;
433 self
434 }
435
436 #[inline]
438 pub fn get_uid(&self) -> &String {
439 &self.uid
440 }
441
442 #[inline]
444 pub fn get_channel(&self) -> &String {
445 &self.channel
446 }
447
448 #[inline]
458 pub fn get_message(&self) -> &Vec<u8> {
459 &self.message
460 }
461
462 #[inline]
472 pub fn get_message_len(&self) -> usize {
473 self.message.len()
474 }
475
476 #[inline]
488 pub fn get_mut_message(&mut self) -> &mut Vec<u8> {
489 &mut self.message
490 }
491
492 #[inline]
505 pub fn encode(&self) -> Vec<u8> {
506 let encoded = serde_cbor::to_vec(self);
507 match encoded {
508 Ok(encoded) => encoded,
509 Err(err) => {
510 println!("Error on encode: {}", err);
511 Vec::new()
512 }
513 }
514 }
515
516 #[inline]
530 pub fn decode(slice: &[u8]) -> Msg {
531 let value = serde_cbor::from_slice(slice);
532 match value {
533 Ok(value) => value,
534 Err(err) => {
535 println!("Error on decode: {}", err);
536 Msg {
537 uid: "".to_string(),
538 channel: "".to_string(),
539 message: Vec::new(),
540 } }
542 }
543 }
544}
545
546#[derive(Serialize, Deserialize, Debug, Clone)]
547struct MsgVec {
548 #[serde(with = "serde_bytes")]
549 encoded_msg: Vec<u8>, }
551
552impl MsgVec {
553 pub fn new(encoded_msg: &Vec<u8>) -> MsgVec {
554 MsgVec {
555 encoded_msg: encoded_msg.clone(),
556 }
557 }
558
559 pub fn get(&self) -> &Vec<u8> {
560 &self.encoded_msg
561 }
562}
563
564#[derive(Serialize, Deserialize, Debug, Clone)]
572pub struct ResyncMsg {
573 resync_message: Vec<MsgVec>,
574}
575
576impl ResyncMsg {
577 #[inline]
589 pub fn new(messages: &Vec<Vec<u8>>) -> ResyncMsg {
590 let mut rmsg = ResyncMsg {
591 resync_message: Vec::new(),
592 };
593 for msg in messages {
595 rmsg.resync_message.push(MsgVec::new(&msg));
596 }
597 rmsg
598 }
599
600 #[inline]
613 pub fn len(&self) -> usize {
614 self.resync_message.len()
615 }
616
617 #[inline]
631 pub fn get_messages(&self) -> Vec<Vec<u8>> {
632 let mut messages = Vec::new();
634 for msg in self.resync_message.iter() {
635 let msg = msg.get();
636 messages.push(msg.clone());
637 }
638 messages
639 }
640
641 #[inline]
657 pub fn encode(&self) -> Vec<u8> {
658 let encoded = serde_cbor::to_vec(self);
659 match encoded {
660 Ok(encoded) => encoded,
661 Err(err) => {
662 println!("Error on resync encode: {}", err);
663 Vec::new()
664 }
665 }
666 }
667
668 #[inline]
686 pub fn decode(slice: &[u8]) -> ResyncMsg {
687 let value = serde_cbor::from_slice(slice);
688 match value {
689 Ok(value) => value,
690 Err(_) => {
691 ResyncMsg {
692 resync_message: Vec::new(),
693 } }
695 }
696 }
697}
698
699pub struct MsgConn {
704 uid: String,
705 channel: String,
706 key: Option<u64>,
707 stream: Option<TcpStream>,
708}
709
710impl MsgConn {
711 #[inline]
720 pub fn new(uid: String, channel: String) -> MsgConn {
721 MsgConn {
722 uid,
723 channel,
724 key: None,
725 stream: None,
726 }
727 }
728
729 #[inline]
739 pub fn get_uid(&self) -> String {
740 self.uid.clone()
741 }
742
743 #[inline]
753 pub fn get_channel(&self) -> String {
754 self.channel.clone()
755 }
756
757 #[inline]
768 pub fn get_key(&self) -> Option<u64> {
769 self.key
770 }
771
772 #[inline]
775 pub fn connect_with_message(mut self, raddr: SocketAddr, msg: Vec<u8>) -> MsgConn {
776 let msg = Msg::new(self.get_uid(), self.get_channel(), msg);
777 match TcpStream::connect(raddr) {
778 Ok(mut stream) => {
779 let _val = stream.set_nodelay(true);
780
781 if self.get_key().is_none() {
782 let mut keys = Vec::new();
783
784 let laddr = match stream.local_addr() {
785 Ok(laddr) => laddr,
786 Err(_) => {
787 let addr = "0.0.0.0:0";
788 addr.parse::<SocketAddr>().unwrap()
789 }
790 };
791 keys.push(MsgHdr::addr2str(&laddr));
792 keys.push(self.get_uid());
793 keys.push(self.get_channel());
794 let key = MsgHdr::do_hash(&keys);
795 self.key = Some(key);
796 }
797 let encoded_msg = msg.encode();
798 let key = self.get_key().unwrap();
799 let keyv = write_key(key);
800 let mut msgv = write_hdr_with_capacity(
801 encoded_msg.len(),
802 MsgHdr::select_cid(key),
803 HDRKEYL + encoded_msg.len(),
804 );
805 msgv.extend(keyv);
806 msgv.extend(encoded_msg);
807 let msgv = msgv.freeze();
808 match stream.write_all(msgv.as_ref()) {
809 Ok(_) => self.stream = Some(stream),
810 Err(err) => {
811 println!("Send error {}", err);
812 self.stream = None;
813 }
814 }
815 self
816 }
817 Err(_) => {
818 println!("Could not connect to server {}", raddr);
819 self
820 }
821 }
822 }
823
824 #[inline]
827 pub fn connect(self, raddr: SocketAddr) -> MsgConn {
828 self.connect_with_message(raddr, Vec::new())
829 }
830
831 #[inline]
837 pub fn send_message(mut self, msg: Vec<u8>) -> MsgConn {
838 let message = Msg::new(self.get_uid(), self.get_channel(), msg);
839 let encoded_msg = message.encode();
840 let key = self.get_key().unwrap();
841 let keyv = write_key(key);
842 let mut msgv = write_hdr_with_capacity(
843 encoded_msg.len(),
844 MsgHdr::select_cid(key),
845 HDRKEYL + encoded_msg.len(),
846 );
847 msgv.extend(keyv);
848 msgv.extend(encoded_msg);
849 let msgv = msgv.freeze();
850 let mut stream = self.stream.unwrap();
851 match stream.write_all(msgv.as_ref()) {
852 Ok(_) => self.stream = Some(stream),
853 Err(err) => {
854 println!("Send error {}", err);
855 self.stream = None;
856 }
857 }
858 self
859 }
860
861 #[inline]
867 pub fn read_message(mut self) -> (MsgConn, Vec<u8>) {
868 let stream = self.stream.unwrap();
869 loop {
870 let tuple = read_n(&stream, HDRKEYL);
871 let status = tuple.0;
872 if let Ok(0) = status {
873 println!("Read failed: eof");
874 self.stream = None;
875 return (self, Vec::new());
876 }
877 let buf = tuple.1;
878 if buf.is_empty() {
879 continue;
880 }
881 if read_hdr_type(buf.as_slice()) != 'M' as u32 {
882 continue;
883 }
884 let hdr_len = read_hdr_len(buf.as_slice());
885 if 0 == hdr_len {
886 continue;
887 }
888 let tuple = read_n(&stream, hdr_len);
889 let status = tuple.0;
890 if let Ok(0) = status {
891 continue;
892 };
893 let payload = tuple.1;
894 if payload.len() != (hdr_len as usize) {
895 continue;
896 }
897 let decoded_message = Msg::decode(payload.as_slice());
898 if 0 == decoded_message.get_message_len() {
899 continue;
900 }
901 self.stream = Some(stream);
902 return (self, decoded_message.get_message().to_owned());
903 }
904 }
905
906 #[inline]
909 pub fn close(mut self) -> MsgConn {
910 if self.stream.is_some() {
911 drop(self.stream.unwrap());
912 }
913 self.stream = None;
914 self
915 }
916}
917
918#[inline]
919pub(crate) fn read_hdr_type(hdr: &[u8]) -> u32 {
920 if hdr.len() < HDRL {
921 return 0;
922 }
923 let mut buf = Cursor::new(&hdr[..]);
924 let num = buf.get_u32_be();
925 num >> 24
926}
927
928fn read_hdr_len(hdr: &[u8]) -> usize {
929 if hdr.len() < HDRL {
930 return 0;
931 }
932 let mut buf = Cursor::new(&hdr[..]);
933 let num = buf.get_u32_be();
934 (num & 0xffffff) as usize
935}
936
937fn write_hdr(len: usize, cid: u32) -> BytesMut {
938 let hdr = (('M' as u32) << 24) | len as u32;
939 let mut msgv = BytesMut::with_capacity(HDRKEYL);
940 msgv.put_u32_be(hdr);
941 msgv.put_u32_be(cid);
942 msgv
943}
944
945fn write_hdr_with_capacity(len: usize, cid: u32, cap: usize) -> BytesMut {
946 let hdr = (('M' as u32) << 24) | len as u32;
947 let mut msgv = BytesMut::with_capacity(cap);
948 msgv.put_u32_be(hdr);
949 msgv.put_u32_be(cid);
950 msgv
951}
952
953fn write_hdr_without_cid(len: usize) -> BytesMut {
954 let hdr = (('M' as u32) << 24) | len as u32;
955 let mut msgv = BytesMut::with_capacity(HDRL);
956 msgv.put_u32_be(hdr);
957 msgv
958}
959
960#[inline]
961pub(crate) fn write_len_to_hdr(len: usize, mut hdrv: BytesMut) -> BytesMut {
962 if hdrv.len() < HDRL {
963 return BytesMut::new();
964 }
965 let tail = hdrv.split_off(HDRL - CIDL);
966 let mut nhdrv = write_hdr_without_cid(len);
967 nhdrv.extend(tail);
968 nhdrv
969}
970
971fn write_key(val: u64) -> BytesMut {
972 let key = val;
973 let mut msgv = BytesMut::with_capacity(KEYL);
974 msgv.put_u64_be(key);
975 msgv
976}
977
978fn write_hdr_with_key(len: usize, key: u64) -> BytesMut {
979 let mut hdrv = write_hdr(len, MsgHdr::select_cid(key));
980 hdrv.extend(write_key(key));
981 hdrv
982}
983
984fn read_key_from_hdr(keyv: &[u8]) -> u64 {
985 if keyv.len() < HDRKEYL {
986 return 0;
987 }
988 let mut buf = Cursor::new(&keyv[HDRL..]);
989 buf.get_u64_be()
990}
991
992fn read_cid_from_hdr(hdrv: &[u8]) -> u32 {
993 if hdrv.len() < HDRL {
994 return 0;
995 }
996 let mut buf = Cursor::new(&hdrv[(HDRL - CIDL)..]);
997 buf.get_u32_be()
998}
999
1000#[inline]
1010pub fn has_peer(peer: &Option<SocketAddr>) -> bool {
1011 peer::has_peer(peer)
1012}
1013
1014fn read_n<R>(reader: R, bytes_to_read: usize) -> (Result<usize, Error>, Vec<u8>)
1015where
1016 R: Read,
1017{
1018 let mut buf = Vec::with_capacity(bytes_to_read);
1019 let mut chunk = reader.take(bytes_to_read as u64);
1020 let status = chunk.read_to_end(&mut buf);
1021 (status, buf)
1022}
1023
1024#[inline]
1042pub fn server_run(
1043 address: SocketAddr,
1044 peer: Option<SocketAddr>,
1045 keyval: String,
1046 keyaddr: String,
1047 hist_limit: usize,
1048 debug_flags: u64,
1049) {
1050 server::run(address, peer, keyval, keyaddr, hist_limit, debug_flags);
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055 use super::*;
1056 use std::net::SocketAddr;
1057 use std::thread;
1058 use std::time::Duration;
1059
1060 #[test]
1061 fn test_read_hdr_len_one() {
1062 let orig_len = 1;
1063 let hdrv = write_hdr(orig_len, 0x1);
1064 let len = read_hdr_len(hdrv.as_ref());
1065 assert_eq!(len, orig_len);
1066 }
1067
1068 #[test]
1069 fn test_read_hdr_len_16k() {
1070 let orig_len = 16000;
1071 let hdrv = write_hdr_with_capacity(orig_len, 0x1, HDRKEYL + orig_len);
1072 let len = read_hdr_len(hdrv.as_ref());
1073 assert_eq!(len, orig_len);
1074 }
1075
1076 #[test]
1077 fn test_read_hdr_len_16_7m() {
1078 let orig_len = 16777215;
1079 let hdrv = write_hdr(orig_len, 0x1);
1080 let len = read_hdr_len(hdrv.as_ref());
1081 assert_eq!(len, orig_len);
1082 }
1083
1084 #[test]
1085 fn test_encode_decode_msg() {
1086 let uid = "User".to_string();
1087 let channel = "Channel".to_string();
1088 let msg = "a test msg".to_string().into_bytes();
1089 let orig_msg = Msg::new(uid, channel, msg);
1090 let encoded_msg = orig_msg.encode();
1091 let decoded_msg = Msg::decode(&encoded_msg);
1092 assert_eq!(decoded_msg.uid, orig_msg.uid);
1093 assert_eq!(decoded_msg.channel, orig_msg.channel);
1094 assert_eq!(decoded_msg.message, orig_msg.message);
1095 }
1096
1097 #[test]
1098 fn test_encode_decode_resync_msg() {
1099 let uid = "User".to_string();
1100 let channel = "Channel".to_string();
1101 let msg = "a test msg".to_string().into_bytes();
1102 let orig_msg = Msg::new(uid, channel, msg);
1103 let encoded_msg = orig_msg.encode();
1104 let uid2 = "User two".to_string();
1105 let channel2 = "Channel two".to_string();
1106 let msg2 = "a test msg two".to_string().into_bytes();
1107 let orig_msg2 = Msg::new(uid2, channel2, msg2);
1108 let encoded_msg2 = orig_msg2.encode();
1109 let vec = vec![encoded_msg, encoded_msg2];
1110 let rmsg = ResyncMsg::new(&vec);
1111 let encoded_resync_msg: Vec<u8> = rmsg.encode();
1112 let decoded_resync_msg: ResyncMsg = ResyncMsg::decode(&encoded_resync_msg);
1113 let mut cnt = 0;
1114 for msg in decoded_resync_msg.get_messages() {
1115 let decoded_msg = Msg::decode(&msg);
1116 if 0 == cnt {
1117 assert_eq!(decoded_msg.uid, orig_msg.uid);
1118 assert_eq!(decoded_msg.channel, orig_msg.channel);
1119 assert_eq!(decoded_msg.message, orig_msg.message);
1120 } else {
1121 assert_eq!(decoded_msg.uid, orig_msg2.uid);
1122 assert_eq!(decoded_msg.channel, orig_msg2.channel);
1123 assert_eq!(decoded_msg.message, orig_msg2.message);
1124 }
1125 cnt += 1;
1126 }
1127 }
1128
1129 #[test]
1130 fn test_set_get_msg() {
1131 let uid = "User".to_string();
1132 let channel = "Channel".to_string();
1133 let msg = "a test msg".to_string().into_bytes();
1134 let orig_msg = Msg::new("".to_string(), channel.to_string(), Vec::new());
1135 let orig_msg = orig_msg.set_uid(uid.clone());
1136 let orig_msg = orig_msg.set_channel(channel.clone());
1137 let orig_msg = orig_msg.set_message(msg.clone());
1138 assert_eq!(&uid, orig_msg.get_uid());
1139 assert_eq!(&channel, orig_msg.get_channel());
1140 assert_eq!(&msg, orig_msg.get_message());
1141 }
1142
1143 #[test]
1144 fn test_set_get_mut_msg() {
1145 let uid = "User".to_string();
1146 let channel = "Channel".to_string();
1147 let omsg = "a test ".to_string().into_bytes();
1148 let nmsg = "a test mut msg".to_string().into_bytes();
1149 let orig_msg = Msg::new("".to_string(), channel.to_string(), omsg);
1150 let orig_msg = orig_msg.set_uid(uid.clone());
1151 let mut orig_msg = orig_msg.set_channel(channel.clone());
1152 let mut_msg = orig_msg.get_mut_message();
1153 mut_msg.extend_from_slice(&"mut msg".to_string().into_bytes());
1154 assert_eq!(&uid, orig_msg.get_uid());
1155 assert_eq!(&channel, orig_msg.get_channel());
1156 assert_eq!(&nmsg, orig_msg.get_message());
1157 }
1158
1159 #[test]
1160 fn test_cid() {
1161 let orig_key = 0xffeffe;
1162 let hdrv = write_hdr_with_key(64, orig_key);
1163 let orig_len = hdrv.len();
1164 let key = read_key_from_hdr(&hdrv);
1165 assert_eq!(orig_key, key);
1166 let read_cid = read_cid_from_hdr(&hdrv);
1167 assert_eq!(orig_key as u32, read_cid);
1168 let key = read_key_from_hdr(&hdrv);
1169 assert_eq!(orig_key, key);
1170 let len = hdrv.len();
1171 assert_eq!(orig_len, len);
1172 }
1173
1174 #[test]
1175 fn test_msgconn_send_read() {
1176 let sec = Duration::new(1, 0);
1177 let addr = "127.0.0.1:8078";
1178 let addr = addr.parse::<SocketAddr>().unwrap();
1179 let raddr = addr.clone();
1180 let uid = "User".to_string();
1181 let uid2 = "User two".to_string();
1182 let channel = "Channel".to_string();
1183 let message = "Hello World!".to_string();
1184
1185 let child =
1187 thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 100, 0));
1188 thread::sleep(sec);
1189
1190 let mut conn = MsgConn::new(uid2.clone(), channel.clone());
1192 conn = conn.connect_with_message(raddr, message.into_bytes());
1193 conn.close();
1194
1195 let mut conn = MsgConn::new(uid.clone(), channel.clone());
1197 conn = conn.connect(raddr);
1198 let (conn, msg) = conn.read_message();
1199 let msg = String::from_utf8_lossy(msg.as_slice());
1200 assert_eq!("Hello World!", msg);
1201
1202 conn.close();
1204
1205 drop(child);
1207 }
1208
1209 #[test]
1210 fn test_msgconn_read_send() {
1211 let sec = Duration::new(1, 0);
1212 let addr = "127.0.0.1:8076";
1213 let addr = addr.parse::<SocketAddr>().unwrap();
1214 let raddr = addr.clone();
1215 let uid = "User".to_string();
1216 let uid2 = "User two".to_string();
1217 let channel = "Channel".to_string();
1218 let message = "Hello World!".to_string();
1219
1220 let child =
1222 thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 100, 0));
1223 thread::sleep(sec);
1224
1225 let mut conn = MsgConn::new(uid.clone(), channel.clone());
1227 conn = conn.connect(raddr);
1228
1229 let mut sconn = MsgConn::new(uid2.clone(), channel.clone());
1231 sconn = sconn.connect_with_message(raddr, message.into_bytes());
1232 sconn.close();
1233
1234 let (conn, msg) = conn.read_message();
1236 let msg = String::from_utf8_lossy(msg.as_slice());
1237 assert_eq!("Hello World!", msg);
1238
1239 conn.close();
1241
1242 drop(child);
1244 }
1245
1246 #[test]
1247 fn test_msgconn_peer_send_read() {
1248 let sec = Duration::new(1, 0);
1249 let addr = "127.0.0.1:8075";
1250 let addr = addr.parse::<SocketAddr>().unwrap();
1251 let paddr = "127.0.0.1:8074";
1252 let paddr = paddr.parse::<SocketAddr>().unwrap();
1253 let praddr = paddr.clone();
1254 let uid = "User".to_string();
1255 let uid2 = "User two".to_string();
1256 let channel = "Channel".to_string();
1257 let message = "Hello World!".to_string();
1258
1259 let child =
1261 thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 100, 0));
1262 thread::sleep(sec);
1263
1264 let pchild = thread::spawn(move || {
1266 server_run(paddr, Some(addr), "".to_string(), "".to_string(), 100, 0)
1267 });
1268 thread::sleep(sec);
1269
1270 let mut conn = MsgConn::new(uid.clone(), channel.clone());
1272 conn = conn.connect_with_message(praddr, message.into_bytes());
1273 conn.close();
1274
1275 let mut conn = MsgConn::new(uid2.clone(), channel.clone());
1277 conn = conn.connect(praddr);
1278 let (conn, msg) = conn.read_message();
1279 let msg = String::from_utf8_lossy(msg.as_slice());
1280 assert_eq!("Hello World!", msg);
1281
1282 conn.close();
1284
1285 drop(pchild);
1287
1288 drop(child);
1290 }
1291
1292 #[test]
1293 fn test_msgconn_peer_read_send() {
1294 let sec = Duration::new(1, 0);
1295 let addr = "127.0.0.1:8073";
1296 let addr = addr.parse::<SocketAddr>().unwrap();
1297 let paddr = "127.0.0.1:8072";
1298 let paddr = paddr.parse::<SocketAddr>().unwrap();
1299 let praddr = paddr.clone();
1300 let uid = "User".to_string();
1301 let uid2 = "User two".to_string();
1302 let channel = "Channel".to_string();
1303 let message = "Hello World!".to_string();
1304
1305 let child =
1307 thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 100, 0));
1308 thread::sleep(sec);
1309
1310 let pchild = thread::spawn(move || {
1312 server_run(paddr, Some(addr), "".to_string(), "".to_string(), 100, 0)
1313 });
1314 thread::sleep(sec);
1315
1316 let mut conn = MsgConn::new(uid.clone(), channel.clone());
1318 conn = conn.connect(praddr);
1319
1320 let mut sconn = MsgConn::new(uid2.clone(), channel.clone());
1322 sconn = sconn.connect_with_message(praddr, message.into_bytes());
1323 sconn.close();
1324
1325 let (conn, msg) = conn.read_message();
1327 let msg = String::from_utf8_lossy(msg.as_slice());
1328 assert_eq!("Hello World!", msg);
1329
1330 conn.close();
1332
1333 drop(pchild);
1335
1336 drop(child);
1338 }
1339
1340 #[test]
1341 fn test_msgconn_basic_read_send() {
1342 let sec = Duration::new(1, 0);
1343 let addr = "127.0.0.1:8071".parse::<SocketAddr>().unwrap();
1345 let serv =
1347 thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 0, 0));
1348 thread::sleep(sec);
1349
1350 let child = thread::spawn(|| {
1351 let uid = "User two".to_string();
1352 let channel = "Channel".to_string();
1353 let addr = "127.0.0.1:8071".parse::<SocketAddr>().unwrap();
1354 let mut conn = MsgConn::new(uid, channel);
1356 conn = conn.connect(addr);
1357
1358 let (conn, msg) = conn.read_message();
1360 let msg = String::from_utf8_lossy(msg.as_slice());
1361 assert_eq!("Hello World!", msg);
1362 conn.close();
1363 });
1364 thread::sleep(sec);
1365
1366 let addr = "127.0.0.1:8071".parse::<SocketAddr>().unwrap();
1367 let uid = "User".to_string();
1368 let channel = "Channel".to_string();
1369 let message = "Hello World!".to_string();
1370
1371 let mut conn = MsgConn::new(uid, channel);
1373 conn = conn.connect_with_message(addr, message.into_bytes());
1374 conn.close();
1375
1376 let _res = child.join();
1377
1378 drop(serv);
1379 }
1380}