gmt_dos_clients_transceiver/
lib.rs

1/*!
2# GMT DOS Actors Transceiver
3
4`gmt_dos-clients_transceiver` provides implementation for two GMT DOS actors clients: a [Transmitter]
5and a [Receiver] allowing to transfer [Data] between GMT DOS actors models through the network.
6
7The communication betweem the transmitter and the receiver is secured by procuring a signed certificate
8shared by both the transmitter and the receiver and a private key for the transmitter only (see also [Crypto]).
9
10The certificate and the private key are generated with
11`
12cargo run --bin crypto
13`
14
15[Data]: https://docs.rs/gmt_dos-clients/latest/gmt_dos_clients/interface/struct.Data.html
16*/
17
18mod crypto;
19mod monitor;
20mod receiver;
21mod transmitter;
22
23use std::{any::type_name, marker::PhantomData};
24
25use interface::{Data, Read, UniqueIdentifier, Update, Write};
26use quinn::Endpoint;
27
28pub use crypto::Crypto;
29pub use monitor::Monitor;
30pub use receiver::CompactRecvr;
31pub use transmitter::TransmitterBuilder;
32
33#[derive(Debug, thiserror::Error)]
34pub enum TransceiverError {
35    #[error("failed to parse IP socket address")]
36    Socket(#[from] std::net::AddrParseError),
37    // #[error("failed to bind endpoint to socket address")]
38    // IO(#[from] std::io::Result<Endpoint>),
39    #[error("connection failed")]
40    ConnectionError(#[from] quinn::ConnectionError),
41    #[error("failed to connect")]
42    ConnectError(#[from] quinn::ConnectError),
43    #[error(transparent)]
44    IO(#[from] std::io::Error),
45    #[error("encryption failed")]
46    Crypto(#[from] rustls::Error),
47    #[error("failed to send data to receiver")]
48    SendToRx(#[from] quinn::WriteError),
49    #[error("data serialization failed ({0})")]
50    Encode(String),
51    #[error("data deserialization failed ({0})")]
52    Decode(String),
53    #[error("failed to read data from transmitter")]
54    RecvFromTx(#[from] quinn::ReadToEndError),
55    #[error("failed to join task")]
56    Join(#[from] tokio::task::JoinError),
57    #[error("expected {0}, received {1}")]
58    DataMismatch(String, String),
59    #[error("{0} stream ended: {1} in {2} ({3}/s)")]
60    StreamEnd(String, String, String, String),
61    #[error("failed to encode data")]
62    BincodeEncode(#[from] bincode::error::EncodeError),
63    #[error("failed to decode data")]
64    BincodeDecode(#[from] bincode::error::DecodeError),
65    #[error("")]
66    Duration(#[from] quinn_proto::VarIntBoundsExceeded),
67}
68pub type Result<T> = std::result::Result<T, TransceiverError>;
69
70/// Receiver functionality of a [Transceiver]
71pub enum Receiver {}
72/// Transmitter functionality of a [Transceiver]
73pub enum Transmitter {}
74/// [Transceiver] without purpose
75pub enum Unset {}
76trait RxOrTx {}
77impl RxOrTx for Transmitter {}
78impl RxOrTx for Receiver {}
79
80pub enum On {}
81pub enum Off {}
82
83#[derive(Default, Debug)]
84pub enum InnerChannel {
85    Bounded(usize),
86    #[default]
87    Unbounded,
88}
89
90/// Transmitter and receiver of [gmt_dos-actors](https://docs.rs/gmt_dos-actors/) [Data](https://docs.rs/gmt_dos-clients/latest/gmt_dos_clients/interface/struct.Data.html)
91pub struct Transceiver<U: UniqueIdentifier, F = Unset, S = Off> {
92    crypto: Crypto,
93    endpoint: Option<quinn::Endpoint>,
94    server_address: String,
95    tx: Option<flume::Sender<Data<U>>>,
96    pub rx: Option<flume::Receiver<Data<U>>>,
97    function: PhantomData<F>,
98    state: PhantomData<S>,
99}
100impl<U: UniqueIdentifier, F> Transceiver<U, F> {
101    pub fn new<S: Into<String>>(
102        crypto: Crypto,
103        server_address: S,
104        endpoint: Endpoint,
105        inner_channel: InnerChannel,
106    ) -> Self {
107        let (tx, rx) = match inner_channel {
108            InnerChannel::Bounded(cap) => flume::bounded(cap),
109            InnerChannel::Unbounded => flume::unbounded(),
110        };
111        Self {
112            crypto,
113            server_address: server_address.into(),
114            endpoint: Some(endpoint),
115            tx: Some(tx),
116            rx: Some(rx),
117            function: PhantomData,
118            state: PhantomData,
119        }
120    }
121}
122impl<U: UniqueIdentifier, F, S> Transceiver<U, F, S> {
123    pub fn take_channel_receiver(&mut self) -> Option<flume::Receiver<Data<U>>> {
124        self.rx.take()
125    }
126    pub fn take_channel_transmitter(&mut self) -> Option<flume::Sender<Data<U>>> {
127        self.tx.take()
128    }
129}
130
131impl<U: UniqueIdentifier, F, S> std::fmt::Debug for Transceiver<U, F, S> {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        f.debug_struct("Transceiver")
134            .field("crypto", &self.crypto)
135            .field("endpoint", &self.endpoint)
136            .field("server_address", &self.server_address)
137            .field("tx", &self.tx)
138            .field("rx", &self.rx)
139            .field("function", &self.function)
140            .field("state", &self.state)
141            .finish()
142    }
143}
144
145impl<U: UniqueIdentifier, F, S> std::fmt::Display for Transceiver<U, F, S> {
146    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147        writeln!(
148            f,
149            "Transceiver[{}] @ {}",
150            type_name::<F>(),
151            self.server_address
152        )?;
153        if let Some(rx) = &self.rx {
154            if rx.is_disconnected() {
155                writeln!(f, " . RX is disconnected")?;
156            } else {
157                if rx.is_empty() {
158                    writeln!(f, " . RX is empty")?;
159                } else {
160                    if rx.is_full() {
161                        writeln!(f, " . RX is full")?;
162                    } else {
163                        writeln!(f, " . RX is holding {} messages", rx.len())?;
164                    }
165                }
166            }
167        }
168        if let Some(tx) = &self.tx {
169            if tx.is_disconnected() {
170                writeln!(f, " . TX is disconnected")?;
171            } else {
172                if tx.is_empty() {
173                    writeln!(f, " . TX is empty")?;
174                } else {
175                    if tx.is_full() {
176                        writeln!(f, " . TX is full")?;
177                    } else {
178                        writeln!(f, " . TX is holding {} messages", tx.len())?;
179                    }
180                }
181            }
182        }
183        Ok(())
184    }
185}
186
187/* impl<U: UniqueIdentifier, V: UniqueIdentifier, F> From<&Transceiver<U, F>> for Transceiver<V, F> {
188    fn from(other: &Transceiver<U, F>) -> Self {
189        let (tx, rx) = flume::unbounded();
190        Self {
191            crypto: other.crypto.clone(),
192            server_address: other.server_address.clone(),
193            endpoint: other.endpoint.clone(),
194            tx: Some(tx),
195            rx: Some(rx),
196            function: PhantomData,
197            state: PhantomData,
198        }
199    }
200} */
201
202impl<U: UniqueIdentifier, F: RxOrTx + Send + Sync> Update for Transceiver<U, F, On> {}
203
204impl<U: UniqueIdentifier> Read<U> for Transceiver<U, Transmitter, On> {
205    fn read(&mut self, data: Data<U>) {
206        if let Some(tx) = self.tx.as_ref() {
207            let _ = tx.send(data);
208        }
209    }
210}
211
212impl<U: UniqueIdentifier> Write<U> for Transceiver<U, Receiver, On> {
213    fn write(&mut self) -> Option<Data<U>> {
214        // if let Some(rx) = self.rx.as_ref() {
215        //     if let Ok(data) = rx.recv() {
216        //         info!("data forwarded");
217        //         Some(data)
218        //     } else {
219        //         info!("rx failed");
220        //         None
221        //     }
222        // } else {
223        //     info!("no rx");
224        //     None
225        // }
226        self.rx.as_ref().and_then(|rx| rx.recv().ok())
227    }
228}
229
230pub fn trim(name: &str) -> String {
231    if let Some((prefix, suffix)) = name.split_once('<') {
232        let generics: Vec<_> = suffix.split(',').map(|s| trim(s)).collect();
233        format!("{}<{}", trim(prefix), generics.join(","))
234    } else {
235        if let Some((_, suffix)) = name.rsplit_once("::") {
236            suffix.into()
237        } else {
238            name.into()
239        }
240    }
241}