goxoy_socket_client/
socket_client.rs

1#![allow(warnings, unused)]
2use mpsc::TryRecvError;
3use std::{
4    io::{self, ErrorKind, Read, Write},
5    net::TcpStream,
6    sync::mpsc::{self, Receiver, Sender},
7    thread,
8    time::{Duration,Instant},
9};
10use goxoy_address_parser::address_parser::*;
11
12pub enum SocketClientErrorType {
13    Connection,
14    Communication,
15}
16pub enum SocketConnectionStatus {
17    Connected,
18    Disconnected,
19}
20pub struct SocketClient {
21    defined: bool,
22    tx:Option<Sender<Vec<u8>>>,
23    max_message_size:usize,
24    local_addr: String,
25    fn_received: Option<fn(String,Vec<u8>)>,
26    fn_error: Option<fn(SocketClientErrorType)>,
27    fn_status: Option<fn(SocketConnectionStatus)>,
28}
29
30impl SocketClient {
31    pub fn new() -> Self {
32        SocketClient {
33            defined: false,
34            tx:None,
35            local_addr: String::new(),
36            max_message_size:1024,
37            fn_error: None,
38            fn_received: None,
39            fn_status: None,
40        }
41    }
42    pub fn new_with_config(config: AddressParser) -> Self {
43        SocketClient {
44            defined: true,
45            tx:None,
46            local_addr: AddressParser::object_to_string(config),
47            max_message_size:1024,
48            fn_error: None,
49            fn_received: None,
50            fn_status: None,
51        }
52    }
53    pub fn on_received(&mut self, on_received_callback: fn(String,Vec<u8>)) {
54        self.fn_received = Some(on_received_callback);
55    }
56    pub fn on_connection_status(&mut self, on_connection_status: fn(SocketConnectionStatus)) {
57        self.fn_status = Some(on_connection_status);
58    }
59    pub fn on_error(&mut self, on_error_callback: fn(SocketClientErrorType)) {
60        self.fn_error = Some(on_error_callback);
61    }
62    pub fn connect_with_config(&mut self, config: AddressParser) -> bool {
63        let local_addr = AddressParser::object_to_string(config);
64        self.local_addr = local_addr;
65        self.defined = true;
66        self.connect_sub_fn()
67    }
68    pub fn connect(&mut self) -> bool {
69        if self.defined == false {
70            false
71        } else {
72            self.connect_sub_fn();
73            return true;
74        }
75    }
76    fn connect_sub_fn(&mut self) -> bool {
77        let msg_size=self.max_message_size;
78        let addr_obj = AddressParser::string_to_object(self.local_addr.clone());
79        let local_addr_obj_1=addr_obj.clone();
80        let mut client_obj = TcpStream::connect(AddressParser::local_addr_for_binding(addr_obj));
81        if client_obj.is_err(){
82            return false;
83        }
84        
85        let mut client=client_obj.unwrap();
86        client
87            .set_nonblocking(true)
88            .expect("failed to initiate non-blocking");
89    
90        let (tx, rx) = mpsc::channel::<Vec<u8>>();
91        self.tx=Some(tx.clone());
92        let fn_received_clone=self.fn_received;
93        let fn_error_clone=self.fn_error;
94        thread::spawn(move || loop {
95            let mut buff = vec![0; msg_size];
96            match client.read(&mut buff) {
97                Ok(_) => {
98                    let peer_addr=client.peer_addr();
99                    let mut local_addr=String::from("0.0.0.0:0");
100                    if peer_addr.is_ok(){
101                        local_addr=AddressParser::binding_addr_to_string(
102                            peer_addr.unwrap().to_string(), 
103                            local_addr_obj_1.protocol_type, 
104                            local_addr_obj_1.ip_version
105                        );
106                    }
107                    let msg = buff.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
108                    if fn_received_clone.is_some() {
109                        fn_received_clone.unwrap()(local_addr,msg.to_vec());
110                    }
111                }
112                Err(ref err) if err.kind() == ErrorKind::WouldBlock => {
113                    /*
114                    if fn_error_clone.is_some() {
115                        fn_error_clone.unwrap()(SocketClientErrorType::Communication);
116                    }
117                    */
118                },
119                Err(_) => {
120                    println!("connection with server was severed");
121                    break;
122                }
123            }
124            match rx.try_recv() {
125                Ok(msg) => {
126                    client.write_all(&msg).expect("writing to socket failed");
127                    println!("message sent {:?}", msg);
128                }
129                Err(TryRecvError::Empty) => {
130                    
131                },
132                Err(TryRecvError::Disconnected) => {
133                    if fn_error_clone.is_some() {
134                        fn_error_clone.unwrap()(SocketClientErrorType::Connection);
135                    }
136                },
137            }
138    
139            thread::sleep(Duration::from_millis(10));
140        });        
141        return true;
142    }
143    pub fn send(&mut self, data: Vec<u8>) -> bool {
144        if self.tx.is_some(){
145            self.tx.as_mut().unwrap().send(data);
146            return true;
147        }
148        return false;
149    }
150
151}
152
153#[test]
154fn full_test() {
155    // cargo test  --lib full_test -- --nocapture
156    let mut client_obj = SocketClient::new_with_config(AddressParser {
157        ip_address: "127.0.0.1".to_string(),
158        port_no: 1234,
159        protocol_type: ProtocolType::TCP,
160        ip_version: IPAddressVersion::IpV4,
161    });
162    client_obj.on_received(|sender,income_data| {
163        println!(
164            "Data Received from : {} => {}",
165            sender,String::from_utf8(income_data.clone()).unwrap()
166        );
167    });
168    client_obj.on_connection_status(|connection_status| match connection_status {
169        SocketConnectionStatus::Connected => {
170            println!("Socket Connected");
171        }
172        SocketConnectionStatus::Disconnected => {
173            println!("Socket Disconnected");
174        }
175    });
176    client_obj.on_error(|error_type| match error_type {
177        SocketClientErrorType::Connection => {
178            println!("Connection Error");
179        }
180        SocketClientErrorType::Communication => {
181            println!("Communication Error");
182        }
183    });
184
185    let mut since_the_epoch = std::time::SystemTime::now()
186        .duration_since(std::time::UNIX_EPOCH)
187        .unwrap()
188        .subsec_nanos();
189    loop {
190        if since_the_epoch >= 1_048_575 {
191            since_the_epoch = since_the_epoch / 2;
192        } else {
193            break;
194        }
195    }
196    if client_obj.connect() {
197        let client_id_str = format!("{:0>5}", since_the_epoch.to_string());
198        println!("CTRL+C to Exit");
199        let mut test_data = String::from("message from => ");
200        test_data.push_str(&client_id_str);
201        client_obj.send(test_data.as_bytes().to_vec());
202        let mut count = 1;
203        loop {
204            let result_obj = client_obj.send(test_data.as_bytes().to_vec());
205            if result_obj == true {
206                println!("Message Sended");
207            } else {
208                println!("Message Sending Error");
209            }
210            thread::sleep(::std::time::Duration::from_millis(30000));
211            count = count + 1;
212            if count > 1_000 {
213                break;
214            }
215        }
216    } else {
217        println!("Not Connected To Server");
218    }
219    assert!(true)
220}