gmt_dos_clients_transceiver/
lib.rs1mod crypto;
19mod monitor;
20mod receiver;
21mod transmitter;
22
23use std::{any::type_name, marker::PhantomData};
24
25use interface::{Data, Read, UniqueIdentifier, Update, Write};
26use quinn::Endpoint;
27
28pub use crypto::Crypto;
29pub use monitor::Monitor;
30pub use receiver::CompactRecvr;
31pub use transmitter::TransmitterBuilder;
32
33#[derive(Debug, thiserror::Error)]
34pub enum TransceiverError {
35 #[error("failed to parse IP socket address")]
36 Socket(#[from] std::net::AddrParseError),
37 #[error("connection failed")]
40 ConnectionError(#[from] quinn::ConnectionError),
41 #[error("failed to connect")]
42 ConnectError(#[from] quinn::ConnectError),
43 #[error(transparent)]
44 IO(#[from] std::io::Error),
45 #[error("encryption failed")]
46 Crypto(#[from] rustls::Error),
47 #[error("failed to send data to receiver")]
48 SendToRx(#[from] quinn::WriteError),
49 #[error("data serialization failed ({0})")]
50 Encode(String),
51 #[error("data deserialization failed ({0})")]
52 Decode(String),
53 #[error("failed to read data from transmitter")]
54 RecvFromTx(#[from] quinn::ReadToEndError),
55 #[error("failed to join task")]
56 Join(#[from] tokio::task::JoinError),
57 #[error("expected {0}, received {1}")]
58 DataMismatch(String, String),
59 #[error("{0} stream ended: {1} in {2} ({3}/s)")]
60 StreamEnd(String, String, String, String),
61 #[error("failed to encode data")]
62 BincodeEncode(#[from] bincode::error::EncodeError),
63 #[error("failed to decode data")]
64 BincodeDecode(#[from] bincode::error::DecodeError),
65 #[error("")]
66 Duration(#[from] quinn_proto::VarIntBoundsExceeded),
67}
68pub type Result<T> = std::result::Result<T, TransceiverError>;
69
70pub enum Receiver {}
72pub enum Transmitter {}
74pub enum Unset {}
76trait RxOrTx {}
77impl RxOrTx for Transmitter {}
78impl RxOrTx for Receiver {}
79
80pub enum On {}
81pub enum Off {}
82
83#[derive(Default, Debug)]
84pub enum InnerChannel {
85 Bounded(usize),
86 #[default]
87 Unbounded,
88}
89
90pub struct Transceiver<U: UniqueIdentifier, F = Unset, S = Off> {
92 crypto: Crypto,
93 endpoint: Option<quinn::Endpoint>,
94 server_address: String,
95 tx: Option<flume::Sender<Data<U>>>,
96 pub rx: Option<flume::Receiver<Data<U>>>,
97 function: PhantomData<F>,
98 state: PhantomData<S>,
99}
100impl<U: UniqueIdentifier, F> Transceiver<U, F> {
101 pub fn new<S: Into<String>>(
102 crypto: Crypto,
103 server_address: S,
104 endpoint: Endpoint,
105 inner_channel: InnerChannel,
106 ) -> Self {
107 let (tx, rx) = match inner_channel {
108 InnerChannel::Bounded(cap) => flume::bounded(cap),
109 InnerChannel::Unbounded => flume::unbounded(),
110 };
111 Self {
112 crypto,
113 server_address: server_address.into(),
114 endpoint: Some(endpoint),
115 tx: Some(tx),
116 rx: Some(rx),
117 function: PhantomData,
118 state: PhantomData,
119 }
120 }
121}
122impl<U: UniqueIdentifier, F, S> Transceiver<U, F, S> {
123 pub fn take_channel_receiver(&mut self) -> Option<flume::Receiver<Data<U>>> {
124 self.rx.take()
125 }
126 pub fn take_channel_transmitter(&mut self) -> Option<flume::Sender<Data<U>>> {
127 self.tx.take()
128 }
129}
130
131impl<U: UniqueIdentifier, F, S> std::fmt::Debug for Transceiver<U, F, S> {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 f.debug_struct("Transceiver")
134 .field("crypto", &self.crypto)
135 .field("endpoint", &self.endpoint)
136 .field("server_address", &self.server_address)
137 .field("tx", &self.tx)
138 .field("rx", &self.rx)
139 .field("function", &self.function)
140 .field("state", &self.state)
141 .finish()
142 }
143}
144
145impl<U: UniqueIdentifier, F, S> std::fmt::Display for Transceiver<U, F, S> {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 writeln!(
148 f,
149 "Transceiver[{}] @ {}",
150 type_name::<F>(),
151 self.server_address
152 )?;
153 if let Some(rx) = &self.rx {
154 if rx.is_disconnected() {
155 writeln!(f, " . RX is disconnected")?;
156 } else {
157 if rx.is_empty() {
158 writeln!(f, " . RX is empty")?;
159 } else {
160 if rx.is_full() {
161 writeln!(f, " . RX is full")?;
162 } else {
163 writeln!(f, " . RX is holding {} messages", rx.len())?;
164 }
165 }
166 }
167 }
168 if let Some(tx) = &self.tx {
169 if tx.is_disconnected() {
170 writeln!(f, " . TX is disconnected")?;
171 } else {
172 if tx.is_empty() {
173 writeln!(f, " . TX is empty")?;
174 } else {
175 if tx.is_full() {
176 writeln!(f, " . TX is full")?;
177 } else {
178 writeln!(f, " . TX is holding {} messages", tx.len())?;
179 }
180 }
181 }
182 }
183 Ok(())
184 }
185}
186
187impl<U: UniqueIdentifier, F: RxOrTx + Send + Sync> Update for Transceiver<U, F, On> {}
203
204impl<U: UniqueIdentifier> Read<U> for Transceiver<U, Transmitter, On> {
205 fn read(&mut self, data: Data<U>) {
206 if let Some(tx) = self.tx.as_ref() {
207 let _ = tx.send(data);
208 }
209 }
210}
211
212impl<U: UniqueIdentifier> Write<U> for Transceiver<U, Receiver, On> {
213 fn write(&mut self) -> Option<Data<U>> {
214 self.rx.as_ref().and_then(|rx| rx.recv().ok())
227 }
228}
229
230pub fn trim(name: &str) -> String {
231 if let Some((prefix, suffix)) = name.split_once('<') {
232 let generics: Vec<_> = suffix.split(',').map(|s| trim(s)).collect();
233 format!("{}<{}", trim(prefix), generics.join(","))
234 } else {
235 if let Some((_, suffix)) = name.rsplit_once("::") {
236 suffix.into()
237 } else {
238 name.into()
239 }
240 }
241}