1use crate::network::{tcp::TcpEventQueue, udp::PingRequest, ConnectionInfo};
2use crate::state::{ExecutionContext, State};
3
4use log::*;
5use mumble_protocol::{control::ControlPacket, Serverbound};
6use mumlib::command::{Command, CommandResponse};
7use std::{
8 rc::Rc,
9 sync::{
10 atomic::{AtomicBool, AtomicU64, Ordering},
11 Arc, RwLock,
12 },
13};
14use tokio::sync::{mpsc, watch};
15
16pub async fn handle(
17 state: Arc<RwLock<State>>,
18 mut command_receiver: mpsc::UnboundedReceiver<(
19 Command,
20 mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>,
21 )>,
22 tcp_event_queue: TcpEventQueue,
23 ping_request_sender: mpsc::UnboundedSender<PingRequest>,
24 mut packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
25 mut connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
26) {
27 debug!("Begin listening for commands");
28 let ping_count = AtomicU64::new(0);
29 while let Some((command, mut response_sender)) = command_receiver.recv().await {
30 debug!("Received command {:?}", command);
31 let event = crate::state::handle_command(
32 Arc::clone(&state),
33 command,
34 &mut packet_sender,
35 &mut connection_info_sender,
36 );
37 match event {
38 ExecutionContext::TcpEventCallback(callbacks) => {
39 let should_handle = Rc::new(AtomicBool::new(true));
41 for (event, generator) in callbacks {
42 let should_handle = Rc::clone(&should_handle);
43 let response_sender = response_sender.clone();
44 tcp_event_queue.register_callback(
45 event,
46 Box::new(move |e| {
47 if should_handle.swap(false, Ordering::Relaxed) {
49 let response = generator(e);
50 for response in response {
51 response_sender.send(response).unwrap();
52 }
53 }
54 }),
55 );
56 }
57 }
58 ExecutionContext::TcpEventSubscriber(event, mut handler) => tcp_event_queue
59 .register_subscriber(
60 event,
61 Box::new(move |event| handler(event, &mut response_sender)),
62 ),
63 ExecutionContext::Now(generator) => {
64 for response in generator() {
65 response_sender.send(response).unwrap();
66 }
67 drop(response_sender);
68 }
69 ExecutionContext::Ping(generator, converter) => {
70 let ret = generator();
71 debug!("Ping generated: {:?}", ret);
72 match ret {
73 Ok(addr) => {
74 let id = ping_count.fetch_add(1, Ordering::Relaxed);
75 let res = ping_request_sender.send((
76 id,
77 addr,
78 Box::new(move |packet| {
79 response_sender.send(converter(packet)).unwrap();
80 }),
81 ));
82 if res.is_err() {
83 panic!();
84 }
85 }
86 Err(e) => {
87 response_sender.send(Err(e)).unwrap();
88 }
89 };
90 }
91 }
92 }
93}