sc2_proxy/remote_control/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
//! Remote control endpoint for the proxy server.
//! Allows only one connection.
//! Commands are taken through a TCP socket in JSON format.
//! This is a custom RPC server.

pub mod message;

use bufstream::BufStream;
use crossbeam::channel::{self, Receiver, Sender};
use log::{debug, info, warn};
use std::io;
use std::io::{BufRead, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;

use serde::Serialize;
use serde_json;

use self::message::{Request, Response, Update};

#[allow(missing_docs)]
pub struct Remote {
    pub recv: Receiver<Request>,
    pub send: Sender<Response>,
    pub update: Sender<Update>,
    pub handle: thread::JoinHandle<()>,
}
impl Remote {
    /// Receive a message, if any available
    pub fn try_recv(&mut self) -> Option<Request> {
        self.recv.try_recv().ok()
    }

    /// Send a message
    /// Panics if the channel is disconnected
    pub fn send(&mut self, msg: Response) {
        self.send.send(msg).expect("Disconnected");
    }
}

fn to_json_line<T>(v: &T) -> Vec<u8>
where
    T: Serialize,
{
    let mut vs = serde_json::to_vec(v).expect("JSON writing failed");
    vs.push(b'\n');
    vs
}

fn process_line(
    mut stream: BufStream<TcpStream>, tx_recv: &mut Sender<Request>, rx_send: &mut Receiver<Response>,
    rx_update: &mut Receiver<Update>,
) -> io::Result<()> {
    loop {
        let mut line = String::new();
        let mut updates: Vec<Update> = Vec::new();
        stream.read_line(&mut line)?;

        match serde_json::from_str::<Request>(&line) {
            Ok(req) => {
                debug!("Request: {:?}", req);
                tx_recv.send(req).expect("Could not send");
                let resp = rx_send.recv().expect("Could not recv");
                while let Ok(u) = rx_update.try_recv() {
                    updates.push(u);
                }
                debug!("Response: {:?}", resp);

                stream.write(&to_json_line(&resp))?;

                if resp == message::Response::Quit {
                    return Ok(());
                }
            },
            Err(e) => {
                stream.write(&to_json_line(&Response::Error(format!("Invalid request: {}", e))))?;
            },
        };
        stream.flush()?;

        for update in updates {
            stream.write(&to_json_line(&update))?;
        }
    }
}

/// Run the remote control server
pub fn run_server(addr: &str) -> Remote {
    let (mut tx_recv, rx_recv) = channel::unbounded::<Request>();
    let (tx_send, mut rx_send) = channel::unbounded::<Response>();
    let (tx_update, mut rx_update) = channel::unbounded::<Update>();

    let listener = TcpListener::bind(addr).expect("Could not listen to rc port");
    let handle = thread::spawn(move || {
        debug!("Ready to accept connections");
        loop {
            let stream = match listener.accept() {
                Ok((s, addr)) => {
                    info!("Connection from {:?} accepted", addr);
                    BufStream::new(s)
                },
                Err(e) => {
                    warn!("Accept failed: {:?}", e);
                    continue;
                },
            };

            match process_line(stream, &mut tx_recv, &mut rx_send, &mut rx_update) {
                Ok(()) => break,
                Err(e) => warn!("Connection closed: {:?}", e),
            }
        }
    });

    Remote {
        recv: rx_recv,
        send: tx_send,
        update: tx_update,
        handle,
    }
}