goxoy_socket_client/
socket_client.rs1#![allow(warnings, unused)]
2use mpsc::TryRecvError;
3use std::{
4 io::{self, ErrorKind, Read, Write},
5 net::TcpStream,
6 sync::mpsc::{self, Receiver, Sender},
7 thread,
8 time::{Duration,Instant},
9};
10use goxoy_address_parser::address_parser::*;
11
12pub enum SocketClientErrorType {
13 Connection,
14 Communication,
15}
16pub enum SocketConnectionStatus {
17 Connected,
18 Disconnected,
19}
20pub struct SocketClient {
21 defined: bool,
22 tx:Option<Sender<Vec<u8>>>,
23 max_message_size:usize,
24 local_addr: String,
25 fn_received: Option<fn(String,Vec<u8>)>,
26 fn_error: Option<fn(SocketClientErrorType)>,
27 fn_status: Option<fn(SocketConnectionStatus)>,
28}
29
30impl SocketClient {
31 pub fn new() -> Self {
32 SocketClient {
33 defined: false,
34 tx:None,
35 local_addr: String::new(),
36 max_message_size:1024,
37 fn_error: None,
38 fn_received: None,
39 fn_status: None,
40 }
41 }
42 pub fn new_with_config(config: AddressParser) -> Self {
43 SocketClient {
44 defined: true,
45 tx:None,
46 local_addr: AddressParser::object_to_string(config),
47 max_message_size:1024,
48 fn_error: None,
49 fn_received: None,
50 fn_status: None,
51 }
52 }
53 pub fn on_received(&mut self, on_received_callback: fn(String,Vec<u8>)) {
54 self.fn_received = Some(on_received_callback);
55 }
56 pub fn on_connection_status(&mut self, on_connection_status: fn(SocketConnectionStatus)) {
57 self.fn_status = Some(on_connection_status);
58 }
59 pub fn on_error(&mut self, on_error_callback: fn(SocketClientErrorType)) {
60 self.fn_error = Some(on_error_callback);
61 }
62 pub fn connect_with_config(&mut self, config: AddressParser) -> bool {
63 let local_addr = AddressParser::object_to_string(config);
64 self.local_addr = local_addr;
65 self.defined = true;
66 self.connect_sub_fn()
67 }
68 pub fn connect(&mut self) -> bool {
69 if self.defined == false {
70 false
71 } else {
72 self.connect_sub_fn();
73 return true;
74 }
75 }
76 fn connect_sub_fn(&mut self) -> bool {
77 let msg_size=self.max_message_size;
78 let addr_obj = AddressParser::string_to_object(self.local_addr.clone());
79 let local_addr_obj_1=addr_obj.clone();
80 let mut client_obj = TcpStream::connect(AddressParser::local_addr_for_binding(addr_obj));
81 if client_obj.is_err(){
82 return false;
83 }
84
85 let mut client=client_obj.unwrap();
86 client
87 .set_nonblocking(true)
88 .expect("failed to initiate non-blocking");
89
90 let (tx, rx) = mpsc::channel::<Vec<u8>>();
91 self.tx=Some(tx.clone());
92 let fn_received_clone=self.fn_received;
93 let fn_error_clone=self.fn_error;
94 thread::spawn(move || loop {
95 let mut buff = vec![0; msg_size];
96 match client.read(&mut buff) {
97 Ok(_) => {
98 let peer_addr=client.peer_addr();
99 let mut local_addr=String::from("0.0.0.0:0");
100 if peer_addr.is_ok(){
101 local_addr=AddressParser::binding_addr_to_string(
102 peer_addr.unwrap().to_string(),
103 local_addr_obj_1.protocol_type,
104 local_addr_obj_1.ip_version
105 );
106 }
107 let msg = buff.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
108 if fn_received_clone.is_some() {
109 fn_received_clone.unwrap()(local_addr,msg.to_vec());
110 }
111 }
112 Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
113 },
119 Err(_) => {
120 println!("connection with server was severed");
121 break;
122 }
123 }
124 match rx.try_recv() {
125 Ok(msg) => {
126 client.write_all(&msg).expect("writing to socket failed");
127 println!("message sent {:?}", msg);
128 }
129 Err(TryRecvError::Empty) => {
130
131 },
132 Err(TryRecvError::Disconnected) => {
133 if fn_error_clone.is_some() {
134 fn_error_clone.unwrap()(SocketClientErrorType::Connection);
135 }
136 },
137 }
138
139 thread::sleep(Duration::from_millis(10));
140 });
141 return true;
142 }
143 pub fn send(&mut self, data: Vec<u8>) -> bool {
144 if self.tx.is_some(){
145 self.tx.as_mut().unwrap().send(data);
146 return true;
147 }
148 return false;
149 }
150
151}
152
153#[test]
154fn full_test() {
155 let mut client_obj = SocketClient::new_with_config(AddressParser {
157 ip_address: "127.0.0.1".to_string(),
158 port_no: 1234,
159 protocol_type: ProtocolType::TCP,
160 ip_version: IPAddressVersion::IpV4,
161 });
162 client_obj.on_received(|sender,income_data| {
163 println!(
164 "Data Received from : {} => {}",
165 sender,String::from_utf8(income_data.clone()).unwrap()
166 );
167 });
168 client_obj.on_connection_status(|connection_status| match connection_status {
169 SocketConnectionStatus::Connected => {
170 println!("Socket Connected");
171 }
172 SocketConnectionStatus::Disconnected => {
173 println!("Socket Disconnected");
174 }
175 });
176 client_obj.on_error(|error_type| match error_type {
177 SocketClientErrorType::Connection => {
178 println!("Connection Error");
179 }
180 SocketClientErrorType::Communication => {
181 println!("Communication Error");
182 }
183 });
184
185 let mut since_the_epoch = std::time::SystemTime::now()
186 .duration_since(std::time::UNIX_EPOCH)
187 .unwrap()
188 .subsec_nanos();
189 loop {
190 if since_the_epoch >= 1_048_575 {
191 since_the_epoch = since_the_epoch / 2;
192 } else {
193 break;
194 }
195 }
196 if client_obj.connect() {
197 let client_id_str = format!("{:0>5}", since_the_epoch.to_string());
198 println!("CTRL+C to Exit");
199 let mut test_data = String::from("message from => ");
200 test_data.push_str(&client_id_str);
201 client_obj.send(test_data.as_bytes().to_vec());
202 let mut count = 1;
203 loop {
204 let result_obj = client_obj.send(test_data.as_bytes().to_vec());
205 if result_obj == true {
206 println!("Message Sended");
207 } else {
208 println!("Message Sending Error");
209 }
210 thread::sleep(::std::time::Duration::from_millis(30000));
211 count = count + 1;
212 if count > 1_000 {
213 break;
214 }
215 }
216 } else {
217 println!("Not Connected To Server");
218 }
219 assert!(true)
220}