gmt_dos_clients_transceiver/
receiver.rs

1use std::{any::type_name, marker::PhantomData, net::SocketAddr, time::Instant};
2
3use interface::{Data, UniqueIdentifier};
4use quinn::Endpoint;
5use tracing::{debug, error, info};
6
7use crate::{Crypto, Monitor, On, Receiver, Transceiver, TransceiverError};
8
9impl<U: UniqueIdentifier> Transceiver<U> {
10    /// [Transceiver] receiver functionality
11    ///
12    /// A receiver is build from both the transmitter and the receiver internet socket addresses
13    ///
14    /// # Examples
15    ///
16    /// ```
17    /// use gmt_dos_clients_transceiver::Transceiver;
18    /// use interface::UID;
19    /// #[derive(UID)]
20    /// #[uid(port = 5001)]
21    /// pub enum IO {}
22    /// let tx_address = "127.0.0.1";
23    /// let rx_address = "127.0.0.1:0";
24    /// let rx = Transceiver::<IO>::receiver(tx_address,rx_address);
25    /// ```
26    pub fn receiver<S: Into<String>, C: Into<String>>(
27        server_address: S,
28        client_address: C,
29    ) -> crate::Result<Transceiver<U, Receiver>> {
30        ReceiverBuilder {
31            server_address: server_address.into(),
32            client_address: client_address.into(),
33            crypto: Default::default(),
34            uid: PhantomData,
35        }
36        .build()
37    }
38    pub fn receiver_builder<S: Into<String>, C: Into<String>>(
39        server_address: S,
40        client_address: C,
41    ) -> ReceiverBuilder<U> {
42        ReceiverBuilder {
43            server_address: server_address.into(),
44            client_address: client_address.into(),
45            crypto: Default::default(),
46            uid: PhantomData,
47        }
48    }
49}
50impl<U: UniqueIdentifier> Transceiver<U, Receiver> {
51    /// Spawn a new [Transceiver] receiver
52    ///
53    /// clone the receiver endpoint that will
54    /// `server_address` to connect to
55    pub fn spawn<V: UniqueIdentifier, A: Into<String>>(
56        &self,
57        server_address: A,
58    ) -> crate::Result<Transceiver<V, Receiver>> {
59        let Self {
60            endpoint, crypto, ..
61        } = &self;
62        let (tx, rx) = flume::unbounded();
63        Ok(Transceiver::<V, Receiver> {
64            crypto: crypto.clone(),
65            endpoint: endpoint.clone(),
66            server_address: server_address.into(),
67            tx: Some(tx),
68            rx: Some(rx),
69            function: PhantomData,
70            state: PhantomData,
71        })
72    }
73}
74
75#[cfg(feature = "flate2")]
76fn decode<U>(bytes: &[u8]) -> crate::Result<((String, Option<Vec<Data<U>>>), usize)>
77where
78    U: UniqueIdentifier,
79    <U as UniqueIdentifier>::DataType: Send + Sync + for<'a> serde::Deserialize<'a>,
80{
81    use flate2::read::DeflateDecoder;
82    let mut deflater = DeflateDecoder::new(bytes);
83    let data = bincode::serde::decode_from_std_read::<(String, Option<Vec<Data<U>>>), _, _>(
84        &mut deflater,
85        bincode::config::standard(),
86    )?;
87    Ok(((data), 0))
88}
89#[cfg(not(feature = "flate2"))]
90fn decode<U>(bytes: &[u8]) -> crate::Result<((String, Option<Vec<Data<U>>>), usize)>
91where
92    U: UniqueIdentifier,
93    <U as UniqueIdentifier>::DataType: Send + Sync + for<'a> serde::Deserialize<'a>,
94{
95    Ok(bincode::serde::decode_from_slice::<
96        (String, Option<Vec<Data<U>>>),
97        _,
98    >(bytes, bincode::config::standard())?)
99}
100
101impl<U: UniqueIdentifier + 'static> Transceiver<U, Receiver> {
102    /// Receive data from the transmitter
103    ///
104    /// Communication with the transmitter happens in a separate thread.
105    /// The receiver will timed-out after 10s if no connection can be established
106    /// with the transmitter
107    pub fn run(self, monitor: &mut Monitor) -> Transceiver<U, Receiver, On>
108    where
109        <U as UniqueIdentifier>::DataType: Send + Sync + for<'a> serde::Deserialize<'a>,
110    {
111        let Self {
112            crypto,
113            mut endpoint,
114            server_address,
115            mut tx,
116            rx,
117            function,
118            ..
119        } = self;
120        let endpoint = endpoint.take().unwrap();
121        let tx = tx.take().unwrap();
122        let address = SocketAddr::new(server_address.parse().unwrap(),U::PORT as u16);
123        let server_name: String = crypto.name.clone();
124        let name = crate::trim(type_name::<U>());
125        let handle = tokio::spawn(async move {
126            let stream = endpoint.connect(address, &server_name)?;
127            let connection = stream.await.map_err(|e| {
128                println!("{name} receiver connection: {e}");
129                e
130            })?;
131            info!(
132                "<{}>: incoming connection: {}",
133                name,
134                connection.remote_address()
135            );
136            let mut n_byte = 0;
137            let now = Instant::now();
138            loop {
139                match connection.accept_uni().await {
140                    Ok(mut recv) => {
141                        // receiving data from transmitter
142                        let bytes = recv.read_to_end(1_000_000_000).await?;
143                        n_byte += bytes.len();
144                        debug!("{} bytes received", bytes.len());
145                        // decoding data
146                        match decode(bytes.as_slice()) {
147                            // received some data from transmitter and sending to client
148                            Ok(((tag, Some(data_packet)), _n)) if tag.as_str() == name => {
149                                debug!(" forwarding data");
150                                for data in data_packet {
151                                    let _ = tx.send(data);
152                                }
153                            }
154                            // received none and closing receiver
155                            Ok(((tag, None), _)) if tag.as_str() == name => {
156                                debug!("<{name}>: data stream ended");
157                                let elapsed = now.elapsed();
158                                let rate = n_byte as f64 / elapsed.as_secs_f64();
159                                break Err(TransceiverError::StreamEnd(
160                                    name.clone(),
161                                    bytesize::ByteSize::b(n_byte as u64).to_string(),
162                                    humantime::format_duration(now.elapsed()).to_string(),
163                                    bytesize::ByteSize::b(rate as u64).to_string(),
164                                ));
165                            }
166                            Ok(((tag, _), _)) => {
167                                error!("<{name}>: expected {name}, received {tag}");
168                                break Err(TransceiverError::DataMismatch(name.clone(), tag));
169                            }
170                            // decoding failure
171                            Err(e) => {
172                                error!("<{name}>: deserializing failed");
173                                break Err(TransceiverError::Decode(e.to_string()));
174                            }
175                        }
176                    }
177                    Err(e) => {
178                        error!("<{name}>: connection with {address} lost");
179                        break Err(TransceiverError::ConnectionError(e));
180                    }
181                }
182            }
183            .or_else(|e| {
184                info!("<{}>: disconnected ({})", &name, e);
185                drop(tx);
186                match e {
187                    TransceiverError::StreamEnd(..) => {
188                        info!("{e}");
189                        Ok(())
190                    }
191                    _ => Err(e),
192                }
193            })
194        });
195
196        monitor.push(handle);
197        Transceiver::<U, Receiver, On> {
198            crypto,
199            endpoint: None,
200            server_address,
201            tx: None,
202            rx,
203            function,
204            state: PhantomData,
205        }
206    }
207}
208
209#[derive(Debug)]
210pub struct ReceiverBuilder<U: UniqueIdentifier> {
211    server_address: String,
212    client_address: String,
213    crypto: Option<Crypto>,
214    uid: PhantomData<U>,
215}
216impl<U: UniqueIdentifier> ReceiverBuilder<U> {
217    pub fn crypto(mut self, crypto: Crypto) -> Self {
218        self.crypto = Some(crypto);
219        self
220    }
221    pub fn build(self) -> crate::Result<Transceiver<U, Receiver>> {
222        let crypto = self.crypto.unwrap_or_default();
223        let client_config = crypto.client()?;
224        let address = self.client_address.parse::<SocketAddr>()?;
225        let mut endpoint = Endpoint::client(address)?;
226        endpoint.set_default_client_config(client_config);
227        Ok(Transceiver::new(
228            crypto,
229            self.server_address,
230            endpoint,
231            crate::InnerChannel::Unbounded,
232        ))
233    }
234}
235
236pub struct CompactRecvr {
237    crypto: Crypto,
238    endpoint: Option<quinn::Endpoint>,
239}
240impl<U: UniqueIdentifier> From<&Transceiver<U, Receiver>> for CompactRecvr {
241    fn from(value: &Transceiver<U, Receiver>) -> Self {
242        let Transceiver::<U, Receiver> {
243            crypto, endpoint, ..
244        } = value;
245        Self {
246            crypto: crypto.clone(),
247            endpoint: endpoint.clone(),
248        }
249    }
250}
251
252impl CompactRecvr {
253    /// Spawn a new [Transceiver] receiver
254    ///
255    /// clone the receiver endpoint that will
256    /// `server_address` to connect to
257    pub fn spawn<V: UniqueIdentifier, A: Into<String>>(
258        &self,
259        server_address: A,
260    ) -> crate::Result<Transceiver<V, Receiver>> {
261        let Self {
262            endpoint, crypto, ..
263        } = &self;
264        let (tx, rx) = flume::unbounded();
265        Ok(Transceiver::<V, Receiver> {
266            crypto: crypto.clone(),
267            endpoint: endpoint.clone(),
268            server_address: server_address.into(),
269            tx: Some(tx),
270            rx: Some(rx),
271            function: PhantomData,
272            state: PhantomData,
273        })
274    }
275}