ftl_protocol/
server.rs

1use std::str::FromStr;
2
3use async_std::{io, task};
4use async_std::prelude::*;
5use async_trait::async_trait;
6use async_std::net::{TcpListener, TcpStream};
7use async_std::channel::{Receiver, bounded};
8
9use log::{debug, error, info, trace};
10
11use crate::protocol::{FtlCommand, FtlError, FtlHandshake, FtlHandshakeFinalised, FtlResponse};
12use crate::util;
13
14pub struct IngestClient {
15    channel_id: Option<String>,
16    hmac_payload: String,
17    handshake: FtlHandshake,
18    stop_signal: Receiver<()>
19}
20
21#[async_trait]
22pub trait IngestServer {
23    async fn launch(&'static self, addr: String) -> Result<(), io::Error> {
24        let listener = TcpListener::bind(addr).await?;
25
26        while let Ok((stream, address)) = listener.accept().await {
27            info!("Remote client connected: {}", address);
28
29            task::spawn(async move {
30                let (reader, writer) = &mut (&stream, &stream);
31
32                // Common data needed by client / server.
33                let (sender, receiver) = bounded(1);
34                let mut client = IngestClient {
35                    channel_id: None,
36                    hmac_payload: util::generate_hmac(),
37                    handshake: FtlHandshake::default(),
38                    stop_signal: receiver,
39                };
40
41                // Socket reader
42                let mut reader = reader.bytes();
43                let mut buffer = Vec::with_capacity(128);
44                while let Some(byte) = reader.next().await {
45                    if let Ok(byte) = byte {
46                        match byte {
47                            b'\n' => {
48                                if buffer.len() > 0 {
49                                    if let Ok(payload) = std::str::from_utf8(&buffer) {
50                                        if let Ok(command) = FtlCommand::from_str(payload) {
51                                            if let Err(error) = self.handler(&mut client, writer, command).await {
52                                                if error.is_err() {
53                                                    error!("Failed to execute FTL command. {:?}", error);
54                                                }
55
56                                                writer.write(
57                                                    error
58                                                        .to_string()
59                                                        .as_bytes()
60                                                )
61                                                .await
62                                                .ok();
63
64                                                break;
65                                            }
66                                        } else {
67                                            error!("Failed to deserialise FTL command. {}", payload);
68                                        }
69                                    } else {
70                                        error!("Failed to convert buffer to UTF8 string.");
71                                    }
72
73                                    buffer.clear();
74                                }
75                            }
76                            // Ignore carriage returns in our implementation.
77                            b'\r' => continue,
78                            byte => buffer.push(byte)
79                        }
80                    } else {
81                        error!("Failed to read anymore bytes from client.");
82                        break;
83                    }
84                }
85
86                info!("Remote FTL client disconnected.");
87                sender.send(()).await.ok();
88                stream.shutdown(std::net::Shutdown::Both).ok();
89            });
90        }
91
92        Ok(())
93    }
94
95    async fn handler(&self, client: &mut IngestClient, mut writer: &TcpStream, command: FtlCommand) -> Result<(), FtlError> {
96        match command {
97            FtlCommand::HMAC => {
98                debug!("Client requested HMAC payload, sending response.");
99                writer.write(
100                    FtlResponse::HMAC {
101                        hmac_payload: client.hmac_payload.clone()
102                    }
103                    .to_string()
104                    .as_bytes()
105                )
106                .await
107                .map_err(|_| FtlError::IoError)?;
108    
109                Ok(())
110            }
111            FtlCommand::Connect { channel_id, hashed_hmac_payload } => {
112                debug!("Client is connecting, attempting to stream to {}.", &channel_id);
113                let known_key = self.get_stream_key(&channel_id)
114                    .await.map_err(|_| FtlError::InvalidStreamKey)?;
115    
116                // * Key starts with $, omit and decode.
117                let client_hash = hex::decode(hashed_hmac_payload[1..].to_string())
118                    .map_err(|_| FtlError::DecodeError)?;
119                
120                let key = ring::hmac::Key::new(ring::hmac::HMAC_SHA512, &known_key.as_bytes());
121    
122                ring::hmac::verify(
123                    &key,
124                    &hex::decode(client.hmac_payload.clone().into_bytes())
125                        .map_err(|_| FtlError::DecodeError)?,
126                    &client_hash.as_slice()
127                ).map_err(|_| FtlError::RingError)?;
128
129                debug!("Client was verified, ready to stream to {}.", &channel_id);
130                client.channel_id = Some(channel_id);
131
132                writer.write(
133                    FtlResponse::Success
134                        .to_string()
135                        .as_bytes()
136                )
137                .await
138                .map_err(|_| FtlError::IoError)?;
139
140                Ok(())
141            }
142            FtlCommand::Attribute { key, value } => client.handshake.insert(key, value),
143            FtlCommand::Dot => {
144                if let Some(channel_id) = &client.channel_id {
145                    let handshake = client.handshake.clone().finalise()?;
146                    let udp_port = self.allocate_ingest(channel_id, handshake, client.stop_signal.clone())
147                        .await.map_err(|_| FtlError::AllocateError)?;
148                    
149                    debug!("Client is about to begin stream. Allocated port {}.", udp_port);
150                    writer.write(
151                        FtlResponse::Connect { udp_port }
152                            .to_string()
153                            .as_bytes()
154                    )
155                    .await
156                    .map_err(|_| FtlError::IoError)?;
157
158                    Ok(())
159                } else {
160                    Err(FtlError::InvalidStreamKey)
161                }
162            }
163            FtlCommand::Ping { channel_id } => {
164                trace!("Client sent ping. {}", &channel_id);
165                writer.write(
166                    FtlResponse::Pong
167                        .to_string()
168                        .as_bytes()
169                )
170                .await
171                .map_err(|_| FtlError::IoError)?;
172
173                Ok(())
174            }
175            FtlCommand::Disconnect => Err(FtlError::Disconnect)
176        }
177    }
178
179    async fn get_stream_key(&self, channel_id: &str) -> Result<String, ()>;
180    async fn allocate_ingest(&self, channel_id: &str, handshake: FtlHandshakeFinalised, stop_signal: Receiver<()>) -> Result<u16, ()>;
181}