goxoy_socket_server/
socket_server.rs

1//#![allow(warnings, unused)]
2use std::{
3    io::{ErrorKind, Read, Write},
4    net::TcpListener,
5    sync::mpsc::{self},
6    thread,
7};
8use std::sync::mpsc::Sender;
9
10use goxoy_address_parser::address_parser::{AddressParser, IPAddressVersion, ProtocolType};
11
12#[derive(Debug)]
13pub enum SocketServerErrorType {
14    SocketStartingError,
15    DataSendingError,
16    Connection,
17    Communication,
18}
19#[derive(Debug)]
20pub enum SocketServerStatus {
21    Connected,
22    Disconnected,
23}
24
25/*
26#[derive(Clone, Debug)]
27pub struct SocketServerMessageList {
28    peer_addr:String,
29    data:Vec<u8>
30}
31*/
32
33#[derive( Debug)]
34pub struct SocketServer {
35    url: String,
36    local_addr_obj:AddressParser,
37    started: bool,
38    defined: bool,
39    tx:Option<Sender<Vec<u8>>>,
40    pub local_addr: String,
41    fn_receive_data: Option<fn(String,Vec<u8>)>,
42    fn_new_client: Option<fn(String)>,
43    fn_disconnet_client: Option<fn(String)>,
44    fn_error: Option<fn(SocketServerErrorType)>,
45    buffer_size: usize,
46}
47
48impl SocketServer {
49    pub fn new() -> Self {
50        SocketServer {
51            url: String::new(),
52            local_addr: String::new(),
53            local_addr_obj:AddressParser { ip_address: String::new(), port_no: 0, protocol_type: ProtocolType::TCP, ip_version: IPAddressVersion::IpV4 },
54            tx:None,
55            started: false,
56            defined: false,
57            fn_receive_data: None,
58            fn_new_client: None,
59            fn_disconnet_client: None,
60            fn_error: None,
61            buffer_size: 1024,
62        }
63    }
64    pub fn new_with_config(
65        protocol_type: ProtocolType,
66        ip_address: String,
67        port_no: usize,
68        ip_version: IPAddressVersion,
69    ) -> Self {
70        let addr_obj=AddressParser {
71            ip_address,
72            port_no,
73            protocol_type,
74            ip_version,
75        };
76        SocketServer {
77            url: String::new(),
78            local_addr: AddressParser::object_to_string(addr_obj.clone()),
79            local_addr_obj: addr_obj,
80            tx:None,
81            defined: true,
82            started: false,
83            fn_receive_data: None,
84            fn_new_client: None,
85            fn_disconnet_client: None,
86            fn_error: None,
87            buffer_size: 1024,
88        }
89    }
90    pub fn set_config(&mut self, config: AddressParser) {
91        self.local_addr_obj = config.clone();
92        let local_addr = AddressParser::object_to_string(config);
93        self.local_addr = local_addr;
94        self.defined = true;
95    }
96    pub fn set_buffer_size(&mut self, buffer_size: usize) {
97        self.buffer_size = buffer_size;
98    }
99    pub fn on_receive(&mut self, on_receive_data: fn(String,Vec<u8>)) {
100        self.fn_receive_data = Some(on_receive_data);
101    }
102    pub fn on_new_client(&mut self, on_new_client: fn(String)) {
103        self.fn_new_client = Some(on_new_client);
104    }
105    pub fn on_client_disconnect(&mut self, on_disconnet_client: fn(String)) {
106        self.fn_disconnet_client = Some(on_disconnet_client);
107    }
108    pub fn on_error(&mut self, on_error: fn(SocketServerErrorType)) {
109        self.fn_error = Some(on_error);
110    }
111    pub fn send(&mut self,peer_addr:String, data: Vec<u8>)->bool{
112        if self.tx.is_some(){
113            self.tx.as_mut().unwrap().send(data);
114            return true;
115        }
116        false
117    }
118    fn start_udp(&mut self) -> bool {
119        true
120    }
121    fn start_tcp(&mut self) -> bool {
122        let listener = TcpListener::bind(&self.url);
123        if listener.is_err() {
124            if self.fn_error.is_some() {
125                let fn_error_obj = self.fn_error.unwrap();
126                fn_error_obj(SocketServerErrorType::Connection);
127            }
128            return false;
129        }
130
131        let server = listener.unwrap();
132        if server.set_nonblocking(true).is_err(){
133            return false;
134        }
135    
136        let mut clients = vec![];
137        let buffer_size=self.buffer_size;
138        
139        let (tx, rx) = mpsc::channel::<Vec<u8>>();
140        self.tx=Some(tx.clone());
141
142        let on_disconnect_client_fn=self.fn_disconnet_client;
143        let on_receive_data=self.fn_receive_data;
144        let on_new_client_fn=self.fn_new_client;
145        let local_addr_obj_1=self.local_addr_obj.clone();
146        let local_addr_obj_2=self.local_addr_obj.clone();
147
148        loop {
149            if let Ok((mut socket, addr)) = server.accept() {
150                if on_new_client_fn.is_some() {
151                    on_new_client_fn.unwrap()(
152                        AddressParser::binding_addr_to_string(
153                            addr.to_string(), 
154                            local_addr_obj_1.protocol_type, 
155                            local_addr_obj_1.ip_version
156                        )
157                    );
158                }
159                
160                let tx = tx.clone();
161                clients.push(socket.try_clone().expect("failed to clone client"));
162                thread::spawn(move || loop {
163                    let mut buff = vec![0; buffer_size];
164                    match socket.read(&mut buff) {
165                        Ok(_) => {
166                            let msg = buff.into_iter().take_while(|&x| x != 0).collect::<Vec<_>>();
167                            if on_receive_data.is_some(){
168                                on_receive_data.unwrap()(
169                                    AddressParser::binding_addr_to_string(
170                                        addr.to_string(), 
171                                        local_addr_obj_2.protocol_type, 
172                                        local_addr_obj_2.ip_version
173                                    ),
174                                    msg.clone()
175                                );
176                            }
177                            tx.send(msg).expect("failed to send message to rx");
178                        }
179                        Err(ref err) if err.kind() == ErrorKind::WouldBlock => (),
180                        Err(_) => {
181                            if on_disconnect_client_fn.is_some(){
182                                on_disconnect_client_fn.unwrap()(
183                                    AddressParser::binding_addr_to_string(
184                                        addr.to_string(), 
185                                        local_addr_obj_2.protocol_type, 
186                                        local_addr_obj_2.ip_version
187                                    )
188                                );
189                            }
190                            break;
191                        }
192                    }
193                    thread::sleep(::std::time::Duration::from_millis(100));
194                });
195            }
196    
197            if let Ok(msg) = rx.try_recv() {
198                clients = clients
199                    .into_iter()
200                    .filter_map(|mut client| {
201                        //println!("sent data");
202                        client.write_all(&msg).map(|_| client).ok()
203                    })
204                    .collect::<Vec<_>>();
205            }
206            thread::sleep(::std::time::Duration::from_millis(100));
207        }
208        return true;
209    }
210    pub fn start(&mut self) -> bool {
211        if self.defined == false {
212            return false;
213        }
214        self.started = true;
215        let addr_obj = AddressParser::string_to_object(self.local_addr.clone());
216        if addr_obj.protocol_type == ProtocolType::TCP {
217            self.url = AddressParser::local_addr_for_binding(addr_obj);
218            if self.fn_receive_data.is_none() {
219                println!("callback did not define");
220            } else {
221                self.start_tcp();
222            }
223        } else {
224            if self.fn_receive_data.is_none() {
225                println!("callback did not define");
226            } else {
227                self.start_udp();
228            }
229        }
230        return true;
231    }
232}
233
234#[test]
235fn full_test() {
236    // cargo test  --lib full_test -- --nocapture
237    let mut server_obj = SocketServer::new_with_config(
238        ProtocolType::TCP,
239        "127.0.0.1".to_string(),
240        1234,
241        IPAddressVersion::IpV4,
242    );
243    println!("server_obj.local_addr: {}", server_obj.local_addr);
244    server_obj.on_receive(|sender,income_data| {
245        let vec_to_string = String::from_utf8(income_data.clone()).unwrap();
246        println!("income callback => {} [ {} ]: {}", sender, income_data.len(), vec_to_string);
247    });
248
249    server_obj.on_new_client(move |on_new_client| {
250        println!("new client connected : {}", on_new_client);
251    });
252    server_obj.on_client_disconnect(move |on_new_client| {
253        println!("client disconnected : {}", on_new_client);
254    });
255    server_obj.on_error(|data| {
256        println!("on error : {:?}", data);
257    });
258    server_obj.start();
259    server_obj.send(String::from("127.0.0.1:1234"), "welcome".as_bytes().to_vec());
260    
261    assert!(true)
262}