Skip to main content

knet/network/
client.rs

1use super::*;
2
3pub struct InnerClient<T> {
4    sender: Sender<T>,
5    t: PhantomData<T>,
6}
7
8type ProtectedClient<T> = Arc<Mutex<InnerClient<T>>>;
9#[derive(Clone)]
10pub struct Client<T>(pub ProtectedClient<T>);
11
12impl<T: KnetTransform + Send + Clone + Sync + Debug + 'static> Client<T> {
13    /* ******************************************************* */
14    /*           Public                                        */
15    /* ******************************************************* */
16
17    ///Run the server on the `addr` adress and return and the server and Receiver 
18    pub async fn run(addr: impl ToSocketAddrs) -> Result<(Self, Receiver<T>)> {
19        let stream = TcpStream::connect(addr).await?;
20        let stream = Arc::new(stream);
21        let (sender_event, receiver_event) = mpsc::unbounded::<T>();
22        let (sender, receiver) = mpsc::unbounded::<T>();
23        let (sender_shutdown, receiver_shutdown) = mpsc::unbounded::<()>();
24        let client = Client(Arc::new(Mutex::new(InnerClient {
25            sender,
26            t: PhantomData,
27        })));
28        spawn_and_log_error(Client::loop_read(
29            client.0.clone(),
30            Arc::clone(&stream),
31            sender_event,
32            sender_shutdown,
33        ));
34        spawn_and_log_error(Client::connection_writer_loop(
35            receiver,
36            stream,
37            receiver_shutdown,
38        ));
39        Ok((client, receiver_event))
40    }
41
42    ///Write the data to the server
43    pub async fn write(client: ProtectedClient<T>, data: T) -> Result<()> {
44        info!("dota dead");
45        client.lock().await.sender.send(data).await?;
46        Ok(())
47    }
48
49    /* ******************************************************* */
50    /*           Private                                       */
51    /* ******************************************************* */
52
53    async fn loop_read(
54        client: ProtectedClient<T>,
55        stream: Arc<TcpStream>,
56        mut sender_event: Sender<T>,
57        _sender_shutdown: Sender<()>,
58    ) -> Result<()> {
59        let mut reader = BufReader::new(&*stream);
60        let size_payload = T::get_size_of_payload();
61        loop {
62            client.lock().await;
63            let mut vector_payload = vec![0u8; size_payload];
64            let buffer_payload = vector_payload[..].as_mut();
65            match reader.read_exact(buffer_payload).await {
66                Ok(_) => {
67                    info!("recv from server {:?}", buffer_payload);
68                    let size_data = T::get_size_of_data(&buffer_payload);
69                    let mut vector_data = vec![0u8; size_data];
70                    let buffer_data = vector_data[..].as_mut();
71
72                    reader.read_exact(buffer_data).await?;
73                    vector_payload.extend_from_slice(&buffer_data);
74                    let data = T::from_raw(&vector_payload);
75                    sender_event.send(data).await?;
76                }
77                Err(e) => {
78                    error!("error in client : {}", e);
79                    break Ok(());
80                }
81            }
82        }
83    }
84
85    async fn connection_writer_loop(
86        mut messages: Receiver<T>,
87        stream: Arc<TcpStream>,
88        mut shutdown: Receiver<()>,
89    ) -> Result<()> {
90        let mut stream = &*stream;
91        loop {
92            select! {
93                msg = messages.next().fuse() => match msg {
94                    Some(ref data) =>{
95                        info!("connection_writer_loop msg {:?}", msg);
96                        stream.write_all(&data.serialize()[..]).await?
97                    },
98                    None => break,
99                },
100                void = shutdown.next().fuse() => match void {
101                    Some(_) => {
102                        info!("connection_writer_loop void");
103                    },
104                    None => {
105                        info!("connection_writer_loop void toi");
106                        break;
107                    },
108                }
109            }
110        }
111        Ok(())
112    }
113}