gmt_dos_clients_transceiver/
receiver.rs1use std::{any::type_name, marker::PhantomData, net::SocketAddr, time::Instant};
2
3use interface::{Data, UniqueIdentifier};
4use quinn::Endpoint;
5use tracing::{debug, error, info};
6
7use crate::{Crypto, Monitor, On, Receiver, Transceiver, TransceiverError};
8
9impl<U: UniqueIdentifier> Transceiver<U> {
10 pub fn receiver<S: Into<String>, C: Into<String>>(
27 server_address: S,
28 client_address: C,
29 ) -> crate::Result<Transceiver<U, Receiver>> {
30 ReceiverBuilder {
31 server_address: server_address.into(),
32 client_address: client_address.into(),
33 crypto: Default::default(),
34 uid: PhantomData,
35 }
36 .build()
37 }
38 pub fn receiver_builder<S: Into<String>, C: Into<String>>(
39 server_address: S,
40 client_address: C,
41 ) -> ReceiverBuilder<U> {
42 ReceiverBuilder {
43 server_address: server_address.into(),
44 client_address: client_address.into(),
45 crypto: Default::default(),
46 uid: PhantomData,
47 }
48 }
49}
50impl<U: UniqueIdentifier> Transceiver<U, Receiver> {
51 pub fn spawn<V: UniqueIdentifier, A: Into<String>>(
56 &self,
57 server_address: A,
58 ) -> crate::Result<Transceiver<V, Receiver>> {
59 let Self {
60 endpoint, crypto, ..
61 } = &self;
62 let (tx, rx) = flume::unbounded();
63 Ok(Transceiver::<V, Receiver> {
64 crypto: crypto.clone(),
65 endpoint: endpoint.clone(),
66 server_address: server_address.into(),
67 tx: Some(tx),
68 rx: Some(rx),
69 function: PhantomData,
70 state: PhantomData,
71 })
72 }
73}
74
75#[cfg(feature = "flate2")]
76fn decode<U>(bytes: &[u8]) -> crate::Result<((String, Option<Vec<Data<U>>>), usize)>
77where
78 U: UniqueIdentifier,
79 <U as UniqueIdentifier>::DataType: Send + Sync + for<'a> serde::Deserialize<'a>,
80{
81 use flate2::read::DeflateDecoder;
82 let mut deflater = DeflateDecoder::new(bytes);
83 let data = bincode::serde::decode_from_std_read::<(String, Option<Vec<Data<U>>>), _, _>(
84 &mut deflater,
85 bincode::config::standard(),
86 )?;
87 Ok(((data), 0))
88}
89#[cfg(not(feature = "flate2"))]
90fn decode<U>(bytes: &[u8]) -> crate::Result<((String, Option<Vec<Data<U>>>), usize)>
91where
92 U: UniqueIdentifier,
93 <U as UniqueIdentifier>::DataType: Send + Sync + for<'a> serde::Deserialize<'a>,
94{
95 Ok(bincode::serde::decode_from_slice::<
96 (String, Option<Vec<Data<U>>>),
97 _,
98 >(bytes, bincode::config::standard())?)
99}
100
101impl<U: UniqueIdentifier + 'static> Transceiver<U, Receiver> {
102 pub fn run(self, monitor: &mut Monitor) -> Transceiver<U, Receiver, On>
108 where
109 <U as UniqueIdentifier>::DataType: Send + Sync + for<'a> serde::Deserialize<'a>,
110 {
111 let Self {
112 crypto,
113 mut endpoint,
114 server_address,
115 mut tx,
116 rx,
117 function,
118 ..
119 } = self;
120 let endpoint = endpoint.take().unwrap();
121 let tx = tx.take().unwrap();
122 let address = SocketAddr::new(server_address.parse().unwrap(),U::PORT as u16);
123 let server_name: String = crypto.name.clone();
124 let name = crate::trim(type_name::<U>());
125 let handle = tokio::spawn(async move {
126 let stream = endpoint.connect(address, &server_name)?;
127 let connection = stream.await.map_err(|e| {
128 println!("{name} receiver connection: {e}");
129 e
130 })?;
131 info!(
132 "<{}>: incoming connection: {}",
133 name,
134 connection.remote_address()
135 );
136 let mut n_byte = 0;
137 let now = Instant::now();
138 loop {
139 match connection.accept_uni().await {
140 Ok(mut recv) => {
141 let bytes = recv.read_to_end(1_000_000_000).await?;
143 n_byte += bytes.len();
144 debug!("{} bytes received", bytes.len());
145 match decode(bytes.as_slice()) {
147 Ok(((tag, Some(data_packet)), _n)) if tag.as_str() == name => {
149 debug!(" forwarding data");
150 for data in data_packet {
151 let _ = tx.send(data);
152 }
153 }
154 Ok(((tag, None), _)) if tag.as_str() == name => {
156 debug!("<{name}>: data stream ended");
157 let elapsed = now.elapsed();
158 let rate = n_byte as f64 / elapsed.as_secs_f64();
159 break Err(TransceiverError::StreamEnd(
160 name.clone(),
161 bytesize::ByteSize::b(n_byte as u64).to_string(),
162 humantime::format_duration(now.elapsed()).to_string(),
163 bytesize::ByteSize::b(rate as u64).to_string(),
164 ));
165 }
166 Ok(((tag, _), _)) => {
167 error!("<{name}>: expected {name}, received {tag}");
168 break Err(TransceiverError::DataMismatch(name.clone(), tag));
169 }
170 Err(e) => {
172 error!("<{name}>: deserializing failed");
173 break Err(TransceiverError::Decode(e.to_string()));
174 }
175 }
176 }
177 Err(e) => {
178 error!("<{name}>: connection with {address} lost");
179 break Err(TransceiverError::ConnectionError(e));
180 }
181 }
182 }
183 .or_else(|e| {
184 info!("<{}>: disconnected ({})", &name, e);
185 drop(tx);
186 match e {
187 TransceiverError::StreamEnd(..) => {
188 info!("{e}");
189 Ok(())
190 }
191 _ => Err(e),
192 }
193 })
194 });
195
196 monitor.push(handle);
197 Transceiver::<U, Receiver, On> {
198 crypto,
199 endpoint: None,
200 server_address,
201 tx: None,
202 rx,
203 function,
204 state: PhantomData,
205 }
206 }
207}
208
209#[derive(Debug)]
210pub struct ReceiverBuilder<U: UniqueIdentifier> {
211 server_address: String,
212 client_address: String,
213 crypto: Option<Crypto>,
214 uid: PhantomData<U>,
215}
216impl<U: UniqueIdentifier> ReceiverBuilder<U> {
217 pub fn crypto(mut self, crypto: Crypto) -> Self {
218 self.crypto = Some(crypto);
219 self
220 }
221 pub fn build(self) -> crate::Result<Transceiver<U, Receiver>> {
222 let crypto = self.crypto.unwrap_or_default();
223 let client_config = crypto.client()?;
224 let address = self.client_address.parse::<SocketAddr>()?;
225 let mut endpoint = Endpoint::client(address)?;
226 endpoint.set_default_client_config(client_config);
227 Ok(Transceiver::new(
228 crypto,
229 self.server_address,
230 endpoint,
231 crate::InnerChannel::Unbounded,
232 ))
233 }
234}
235
236pub struct CompactRecvr {
237 crypto: Crypto,
238 endpoint: Option<quinn::Endpoint>,
239}
240impl<U: UniqueIdentifier> From<&Transceiver<U, Receiver>> for CompactRecvr {
241 fn from(value: &Transceiver<U, Receiver>) -> Self {
242 let Transceiver::<U, Receiver> {
243 crypto, endpoint, ..
244 } = value;
245 Self {
246 crypto: crypto.clone(),
247 endpoint: endpoint.clone(),
248 }
249 }
250}
251
252impl CompactRecvr {
253 pub fn spawn<V: UniqueIdentifier, A: Into<String>>(
258 &self,
259 server_address: A,
260 ) -> crate::Result<Transceiver<V, Receiver>> {
261 let Self {
262 endpoint, crypto, ..
263 } = &self;
264 let (tx, rx) = flume::unbounded();
265 Ok(Transceiver::<V, Receiver> {
266 crypto: crypto.clone(),
267 endpoint: endpoint.clone(),
268 server_address: server_address.into(),
269 tx: Some(tx),
270 rx: Some(rx),
271 function: PhantomData,
272 state: PhantomData,
273 })
274 }
275}