Skip to main content

over_there/core/client/
mod.rs

1mod connected;
2pub mod error;
3pub mod file;
4mod inbound;
5pub mod proc;
6pub mod state;
7
8pub use connected::ConnectedClient;
9
10use crate::core::{
11    event::{AddrEventManager, EventManager},
12    msg::content::Content,
13    Transport,
14};
15use derive_builder::Builder;
16use log::warn;
17use crate::utils::Either;
18use crate::core::transport::{
19    self as wire, Authenticator, Bicrypter, NetTransmission, Wire,
20};
21use std::net::SocketAddr;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::{
25    io,
26    net::{TcpStream, UdpSocket},
27    runtime::Handle,
28    sync::{mpsc, Mutex},
29};
30
31/// Represents a client configuration prior to connecting
32#[derive(Builder)]
33pub struct Client<A, B>
34where
35    A: Authenticator,
36    B: Bicrypter,
37{
38    /// TTL to collect all packets for a msg
39    #[builder(default = "crate::core::transport::constants::DEFAULT_TTL")]
40    packet_ttl: Duration,
41
42    /// Used to sign & verify msgs
43    authenticator: A,
44
45    /// Used to encrypt & decrypt msgs
46    bicrypter: B,
47
48    /// Transportation mechanism & address to listen on
49    transport: Transport,
50
51    /// Internal buffer for cross-thread messaging
52    #[builder(default = "1000")]
53    buffer: usize,
54}
55
56impl<A, B> Client<A, B>
57where
58    A: Authenticator + Send + Sync + 'static,
59    B: Bicrypter + Send + Sync + 'static,
60{
61    /// Starts actively listening for msgs via the specified transport medium
62    pub async fn connect(self) -> io::Result<ConnectedClient> {
63        let state = Arc::new(Mutex::new(state::ClientState::default()));
64
65        match self.transport.clone() {
66            Transport::Tcp(addrs) => {
67                build_and_connect_tcp_client(self, Arc::clone(&state), &addrs)
68                    .await
69            }
70            Transport::Udp(addrs) => {
71                build_and_connect_udp_client(self, Arc::clone(&state), &addrs)
72                    .await
73            }
74        }
75    }
76}
77
78async fn build_and_connect_tcp_client<A, B>(
79    client: Client<A, B>,
80    state: Arc<Mutex<state::ClientState>>,
81    addrs: &[SocketAddr],
82) -> io::Result<ConnectedClient>
83where
84    A: Authenticator + Send + Sync + 'static,
85    B: Bicrypter + Send + Sync + 'static,
86{
87    let handle = Handle::current();
88
89    // NOTE: Tokio does not support &[SocketAddr] -> ToSocketAddrs,
90    //       so we have to loop through manually
91    // See https://github.com/tokio-rs/tokio/pull/1760#discussion_r379120864
92    let stream = {
93        let mut stream = None;
94        for addr in addrs.iter() {
95            match TcpStream::connect(addr).await {
96                Ok(s) => {
97                    stream = Some(s);
98                    break;
99                }
100                Err(x) => warn!("Failed to connect to {}: {}", addr, x),
101            }
102        }
103        stream
104            .ok_or_else(|| io::Error::from(io::ErrorKind::ConnectionRefused))?
105    };
106    let remote_addr = stream.peer_addr()?;
107    let wire = Wire::new(
108        NetTransmission::TcpEthernet.into(),
109        client.packet_ttl,
110        client.authenticator,
111        client.bicrypter,
112    );
113
114    let (tx, rx) = mpsc::channel(client.buffer);
115    let event_handle = handle.spawn(event_loop(
116        Arc::clone(&state),
117        inbound::InboundMsgReader::new(rx),
118    ));
119    let event_manager = EventManager::for_tcp_stream(
120        handle.clone(),
121        client.buffer,
122        stream,
123        remote_addr,
124        wire,
125        tx,
126    );
127
128    Ok(ConnectedClient {
129        state,
130        event_manager: Either::Left(event_manager),
131        event_handle,
132        remote_addr,
133        timeout: ConnectedClient::DEFAULT_TIMEOUT,
134    })
135}
136
137async fn build_and_connect_udp_client<A, B>(
138    client: Client<A, B>,
139    state: Arc<Mutex<state::ClientState>>,
140    addrs: &[SocketAddr],
141) -> io::Result<ConnectedClient>
142where
143    A: Authenticator + Send + Sync + 'static,
144    B: Bicrypter + Send + Sync + 'static,
145{
146    let handle = Handle::current();
147
148    // NOTE: Tokio does not support &[SocketAddr] -> ToSocketAddrs,
149    //       so we have to loop through manually
150    // See https://github.com/tokio-rs/tokio/pull/1760#discussion_r379120864
151    let (socket, remote_addr) = {
152        let mut socket_and_addr = None;
153        for addr in addrs.iter() {
154            match wire::net::udp::connect(*addr) {
155                Ok(s) => {
156                    socket_and_addr = Some((s, *addr));
157                    break;
158                }
159                Err(x) => warn!("Failed to connect to {}: {}", *addr, x),
160            }
161        }
162
163        // NOTE: Must use Handle::enter to provide proper runtime when
164        //       using UdpSocket::from_std
165        handle.enter(|| {
166            socket_and_addr
167                .ok_or_else(|| {
168                    io::Error::from(io::ErrorKind::ConnectionRefused)
169                })
170                .and_then(|(s, addr)| UdpSocket::from_std(s).map(|s| (s, addr)))
171        })?
172    };
173
174    let addr = socket.local_addr()?;
175    let transmission = NetTransmission::udp_from_addr(addr);
176
177    let wire = Wire::new(
178        transmission.into(),
179        client.packet_ttl,
180        client.authenticator,
181        client.bicrypter,
182    );
183
184    let (tx, rx) = mpsc::channel(client.buffer);
185    let event_handle = handle.spawn(event_loop(
186        Arc::clone(&state),
187        inbound::InboundMsgReader::new(rx),
188    ));
189    let addr_event_manager = AddrEventManager::for_udp_socket(
190        handle,
191        client.buffer,
192        socket,
193        wire,
194        tx,
195    );
196
197    Ok(ConnectedClient {
198        state,
199        event_manager: Either::Right(addr_event_manager),
200        event_handle,
201        remote_addr,
202        timeout: ConnectedClient::DEFAULT_TIMEOUT,
203    })
204}
205
206async fn event_loop<T>(
207    state: Arc<Mutex<state::ClientState>>,
208    mut r: inbound::InboundMsgReader<T>,
209) {
210    while let Some(msg) = r.next().await {
211        // Update the last time we received a msg from the server
212        state.lock().await.last_contact = Instant::now();
213
214        if let (Some(header), Content::Reply(reply)) =
215            (msg.parent_header.as_ref(), &msg.content)
216        {
217            state
218                .lock()
219                .await
220                .callback_manager
221                .invoke_callback(header.id, reply)
222        }
223    }
224}