1use super::*;
2
3pub struct InnerClient<T> {
4 sender: Sender<T>,
5 t: PhantomData<T>,
6}
7
8type ProtectedClient<T> = Arc<Mutex<InnerClient<T>>>;
9#[derive(Clone)]
10pub struct Client<T>(pub ProtectedClient<T>);
11
12impl<T: KnetTransform + Send + Clone + Sync + Debug + 'static> Client<T> {
13 pub async fn run(addr: impl ToSocketAddrs) -> Result<(Self, Receiver<T>)> {
19 let stream = TcpStream::connect(addr).await?;
20 let stream = Arc::new(stream);
21 let (sender_event, receiver_event) = mpsc::unbounded::<T>();
22 let (sender, receiver) = mpsc::unbounded::<T>();
23 let (sender_shutdown, receiver_shutdown) = mpsc::unbounded::<()>();
24 let client = Client(Arc::new(Mutex::new(InnerClient {
25 sender,
26 t: PhantomData,
27 })));
28 spawn_and_log_error(Client::loop_read(
29 client.0.clone(),
30 Arc::clone(&stream),
31 sender_event,
32 sender_shutdown,
33 ));
34 spawn_and_log_error(Client::connection_writer_loop(
35 receiver,
36 stream,
37 receiver_shutdown,
38 ));
39 Ok((client, receiver_event))
40 }
41
42 pub async fn write(client: ProtectedClient<T>, data: T) -> Result<()> {
44 info!("dota dead");
45 client.lock().await.sender.send(data).await?;
46 Ok(())
47 }
48
49 async fn loop_read(
54 client: ProtectedClient<T>,
55 stream: Arc<TcpStream>,
56 mut sender_event: Sender<T>,
57 _sender_shutdown: Sender<()>,
58 ) -> Result<()> {
59 let mut reader = BufReader::new(&*stream);
60 let size_payload = T::get_size_of_payload();
61 loop {
62 client.lock().await;
63 let mut vector_payload = vec![0u8; size_payload];
64 let buffer_payload = vector_payload[..].as_mut();
65 match reader.read_exact(buffer_payload).await {
66 Ok(_) => {
67 info!("recv from server {:?}", buffer_payload);
68 let size_data = T::get_size_of_data(&buffer_payload);
69 let mut vector_data = vec![0u8; size_data];
70 let buffer_data = vector_data[..].as_mut();
71
72 reader.read_exact(buffer_data).await?;
73 vector_payload.extend_from_slice(&buffer_data);
74 let data = T::from_raw(&vector_payload);
75 sender_event.send(data).await?;
76 }
77 Err(e) => {
78 error!("error in client : {}", e);
79 break Ok(());
80 }
81 }
82 }
83 }
84
85 async fn connection_writer_loop(
86 mut messages: Receiver<T>,
87 stream: Arc<TcpStream>,
88 mut shutdown: Receiver<()>,
89 ) -> Result<()> {
90 let mut stream = &*stream;
91 loop {
92 select! {
93 msg = messages.next().fuse() => match msg {
94 Some(ref data) =>{
95 info!("connection_writer_loop msg {:?}", msg);
96 stream.write_all(&data.serialize()[..]).await?
97 },
98 None => break,
99 },
100 void = shutdown.next().fuse() => match void {
101 Some(_) => {
102 info!("connection_writer_loop void");
103 },
104 None => {
105 info!("connection_writer_loop void toi");
106 break;
107 },
108 }
109 }
110 }
111 Ok(())
112 }
113}