1use crate::error::ClientError;
2use crate::network::{tcp, udp, ConnectionInfo};
3use crate::state::State;
4use crate::{command, network::tcp::TcpEventQueue};
5
6use futures_util::{select, FutureExt};
7use mumble_protocol::{control::ControlPacket, crypt::ClientCryptState, Serverbound};
8use mumlib::command::{Command, CommandResponse};
9use std::sync::{Arc, RwLock};
10use tokio::sync::{mpsc, watch};
11
12pub async fn handle(
13 state: State,
14 command_receiver: mpsc::UnboundedReceiver<(
15 Command,
16 mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>,
17 )>,
18) -> Result<(), ClientError> {
19 let (connection_info_sender, connection_info_receiver) =
20 watch::channel::<Option<ConnectionInfo>>(None);
21 let (crypt_state_sender, crypt_state_receiver) = mpsc::channel::<ClientCryptState>(1);
22 let (packet_sender, packet_receiver) = mpsc::unbounded_channel::<ControlPacket<Serverbound>>();
23 let (ping_request_sender, ping_request_receiver) = mpsc::unbounded_channel();
24 let event_queue = TcpEventQueue::new();
25
26 let state = Arc::new(RwLock::new(state));
27
28 select! {
29 r = tcp::handle(
30 Arc::clone(&state),
31 connection_info_receiver.clone(),
32 crypt_state_sender,
33 packet_sender.clone(),
34 packet_receiver,
35 event_queue.clone(),
36 ).fuse() => r.map_err(ClientError::TcpError),
37 _ = udp::handle(
38 Arc::clone(&state),
39 connection_info_receiver.clone(),
40 crypt_state_receiver,
41 ).fuse() => Ok(()),
42 _ = command::handle(
43 state,
44 command_receiver,
45 event_queue,
46 ping_request_sender,
47 packet_sender,
48 connection_info_sender,
49 ).fuse() => Ok(()),
50 _ = udp::handle_pings(ping_request_receiver).fuse() => Ok(()),
51 }
52}