1mod connected;
2pub mod error;
3pub mod file;
4mod inbound;
5pub mod proc;
6pub mod state;
7
8pub use connected::ConnectedClient;
9
10use crate::core::{
11 event::{AddrEventManager, EventManager},
12 msg::content::Content,
13 Transport,
14};
15use derive_builder::Builder;
16use log::warn;
17use crate::utils::Either;
18use crate::core::transport::{
19 self as wire, Authenticator, Bicrypter, NetTransmission, Wire,
20};
21use std::net::SocketAddr;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::{
25 io,
26 net::{TcpStream, UdpSocket},
27 runtime::Handle,
28 sync::{mpsc, Mutex},
29};
30
31#[derive(Builder)]
33pub struct Client<A, B>
34where
35 A: Authenticator,
36 B: Bicrypter,
37{
38 #[builder(default = "crate::core::transport::constants::DEFAULT_TTL")]
40 packet_ttl: Duration,
41
42 authenticator: A,
44
45 bicrypter: B,
47
48 transport: Transport,
50
51 #[builder(default = "1000")]
53 buffer: usize,
54}
55
56impl<A, B> Client<A, B>
57where
58 A: Authenticator + Send + Sync + 'static,
59 B: Bicrypter + Send + Sync + 'static,
60{
61 pub async fn connect(self) -> io::Result<ConnectedClient> {
63 let state = Arc::new(Mutex::new(state::ClientState::default()));
64
65 match self.transport.clone() {
66 Transport::Tcp(addrs) => {
67 build_and_connect_tcp_client(self, Arc::clone(&state), &addrs)
68 .await
69 }
70 Transport::Udp(addrs) => {
71 build_and_connect_udp_client(self, Arc::clone(&state), &addrs)
72 .await
73 }
74 }
75 }
76}
77
78async fn build_and_connect_tcp_client<A, B>(
79 client: Client<A, B>,
80 state: Arc<Mutex<state::ClientState>>,
81 addrs: &[SocketAddr],
82) -> io::Result<ConnectedClient>
83where
84 A: Authenticator + Send + Sync + 'static,
85 B: Bicrypter + Send + Sync + 'static,
86{
87 let handle = Handle::current();
88
89 let stream = {
93 let mut stream = None;
94 for addr in addrs.iter() {
95 match TcpStream::connect(addr).await {
96 Ok(s) => {
97 stream = Some(s);
98 break;
99 }
100 Err(x) => warn!("Failed to connect to {}: {}", addr, x),
101 }
102 }
103 stream
104 .ok_or_else(|| io::Error::from(io::ErrorKind::ConnectionRefused))?
105 };
106 let remote_addr = stream.peer_addr()?;
107 let wire = Wire::new(
108 NetTransmission::TcpEthernet.into(),
109 client.packet_ttl,
110 client.authenticator,
111 client.bicrypter,
112 );
113
114 let (tx, rx) = mpsc::channel(client.buffer);
115 let event_handle = handle.spawn(event_loop(
116 Arc::clone(&state),
117 inbound::InboundMsgReader::new(rx),
118 ));
119 let event_manager = EventManager::for_tcp_stream(
120 handle.clone(),
121 client.buffer,
122 stream,
123 remote_addr,
124 wire,
125 tx,
126 );
127
128 Ok(ConnectedClient {
129 state,
130 event_manager: Either::Left(event_manager),
131 event_handle,
132 remote_addr,
133 timeout: ConnectedClient::DEFAULT_TIMEOUT,
134 })
135}
136
137async fn build_and_connect_udp_client<A, B>(
138 client: Client<A, B>,
139 state: Arc<Mutex<state::ClientState>>,
140 addrs: &[SocketAddr],
141) -> io::Result<ConnectedClient>
142where
143 A: Authenticator + Send + Sync + 'static,
144 B: Bicrypter + Send + Sync + 'static,
145{
146 let handle = Handle::current();
147
148 let (socket, remote_addr) = {
152 let mut socket_and_addr = None;
153 for addr in addrs.iter() {
154 match wire::net::udp::connect(*addr) {
155 Ok(s) => {
156 socket_and_addr = Some((s, *addr));
157 break;
158 }
159 Err(x) => warn!("Failed to connect to {}: {}", *addr, x),
160 }
161 }
162
163 handle.enter(|| {
166 socket_and_addr
167 .ok_or_else(|| {
168 io::Error::from(io::ErrorKind::ConnectionRefused)
169 })
170 .and_then(|(s, addr)| UdpSocket::from_std(s).map(|s| (s, addr)))
171 })?
172 };
173
174 let addr = socket.local_addr()?;
175 let transmission = NetTransmission::udp_from_addr(addr);
176
177 let wire = Wire::new(
178 transmission.into(),
179 client.packet_ttl,
180 client.authenticator,
181 client.bicrypter,
182 );
183
184 let (tx, rx) = mpsc::channel(client.buffer);
185 let event_handle = handle.spawn(event_loop(
186 Arc::clone(&state),
187 inbound::InboundMsgReader::new(rx),
188 ));
189 let addr_event_manager = AddrEventManager::for_udp_socket(
190 handle,
191 client.buffer,
192 socket,
193 wire,
194 tx,
195 );
196
197 Ok(ConnectedClient {
198 state,
199 event_manager: Either::Right(addr_event_manager),
200 event_handle,
201 remote_addr,
202 timeout: ConnectedClient::DEFAULT_TIMEOUT,
203 })
204}
205
206async fn event_loop<T>(
207 state: Arc<Mutex<state::ClientState>>,
208 mut r: inbound::InboundMsgReader<T>,
209) {
210 while let Some(msg) = r.next().await {
211 state.lock().await.last_contact = Instant::now();
213
214 if let (Some(header), Content::Reply(reply)) =
215 (msg.parent_header.as_ref(), &msg.content)
216 {
217 state
218 .lock()
219 .await
220 .callback_manager
221 .invoke_callback(header.id, reply)
222 }
223 }
224}