mum_cli/
command.rs

1use crate::network::{tcp::TcpEventQueue, udp::PingRequest, ConnectionInfo};
2use crate::state::{ExecutionContext, State};
3
4use log::*;
5use mumble_protocol::{control::ControlPacket, Serverbound};
6use mumlib::command::{Command, CommandResponse};
7use std::{
8    rc::Rc,
9    sync::{
10        atomic::{AtomicBool, AtomicU64, Ordering},
11        Arc, RwLock,
12    },
13};
14use tokio::sync::{mpsc, watch};
15
16pub async fn handle(
17    state: Arc<RwLock<State>>,
18    mut command_receiver: mpsc::UnboundedReceiver<(
19        Command,
20        mpsc::UnboundedSender<mumlib::error::Result<Option<CommandResponse>>>,
21    )>,
22    tcp_event_queue: TcpEventQueue,
23    ping_request_sender: mpsc::UnboundedSender<PingRequest>,
24    mut packet_sender: mpsc::UnboundedSender<ControlPacket<Serverbound>>,
25    mut connection_info_sender: watch::Sender<Option<ConnectionInfo>>,
26) {
27    debug!("Begin listening for commands");
28    let ping_count = AtomicU64::new(0);
29    while let Some((command, mut response_sender)) = command_receiver.recv().await {
30        debug!("Received command {:?}", command);
31        let event = crate::state::handle_command(
32            Arc::clone(&state),
33            command,
34            &mut packet_sender,
35            &mut connection_info_sender,
36        );
37        match event {
38            ExecutionContext::TcpEventCallback(callbacks) => {
39                // A shared bool ensures that only one of the supplied callbacks is run.
40                let should_handle = Rc::new(AtomicBool::new(true));
41                for (event, generator) in callbacks {
42                    let should_handle = Rc::clone(&should_handle);
43                    let response_sender = response_sender.clone();
44                    tcp_event_queue.register_callback(
45                        event,
46                        Box::new(move |e| {
47                            // If should_handle == true no other callback has been run yet.
48                            if should_handle.swap(false, Ordering::Relaxed) {
49                                let response = generator(e);
50                                for response in response {
51                                    response_sender.send(response).unwrap();
52                                }
53                            }
54                        }),
55                    );
56                }
57            }
58            ExecutionContext::TcpEventSubscriber(event, mut handler) => tcp_event_queue
59                .register_subscriber(
60                    event,
61                    Box::new(move |event| handler(event, &mut response_sender)),
62                ),
63            ExecutionContext::Now(generator) => {
64                for response in generator() {
65                    response_sender.send(response).unwrap();
66                }
67                drop(response_sender);
68            }
69            ExecutionContext::Ping(generator, converter) => {
70                let ret = generator();
71                debug!("Ping generated: {:?}", ret);
72                match ret {
73                    Ok(addr) => {
74                        let id = ping_count.fetch_add(1, Ordering::Relaxed);
75                        let res = ping_request_sender.send((
76                            id,
77                            addr,
78                            Box::new(move |packet| {
79                                response_sender.send(converter(packet)).unwrap();
80                            }),
81                        ));
82                        if res.is_err() {
83                            panic!();
84                        }
85                    }
86                    Err(e) => {
87                        response_sender.send(Err(e)).unwrap();
88                    }
89                };
90            }
91        }
92    }
93}