1use std::str::FromStr;
2
3use async_std::{io, task};
4use async_std::prelude::*;
5use async_trait::async_trait;
6use async_std::net::{TcpListener, TcpStream};
7use async_std::channel::{Receiver, bounded};
8
9use log::{debug, error, info, trace};
10
11use crate::protocol::{FtlCommand, FtlError, FtlHandshake, FtlHandshakeFinalised, FtlResponse};
12use crate::util;
13
14pub struct IngestClient {
15 channel_id: Option<String>,
16 hmac_payload: String,
17 handshake: FtlHandshake,
18 stop_signal: Receiver<()>
19}
20
21#[async_trait]
22pub trait IngestServer {
23 async fn launch(&'static self, addr: String) -> Result<(), io::Error> {
24 let listener = TcpListener::bind(addr).await?;
25
26 while let Ok((stream, address)) = listener.accept().await {
27 info!("Remote client connected: {}", address);
28
29 task::spawn(async move {
30 let (reader, writer) = &mut (&stream, &stream);
31
32 let (sender, receiver) = bounded(1);
34 let mut client = IngestClient {
35 channel_id: None,
36 hmac_payload: util::generate_hmac(),
37 handshake: FtlHandshake::default(),
38 stop_signal: receiver,
39 };
40
41 let mut reader = reader.bytes();
43 let mut buffer = Vec::with_capacity(128);
44 while let Some(byte) = reader.next().await {
45 if let Ok(byte) = byte {
46 match byte {
47 b'\n' => {
48 if buffer.len() > 0 {
49 if let Ok(payload) = std::str::from_utf8(&buffer) {
50 if let Ok(command) = FtlCommand::from_str(payload) {
51 if let Err(error) = self.handler(&mut client, writer, command).await {
52 if error.is_err() {
53 error!("Failed to execute FTL command. {:?}", error);
54 }
55
56 writer.write(
57 error
58 .to_string()
59 .as_bytes()
60 )
61 .await
62 .ok();
63
64 break;
65 }
66 } else {
67 error!("Failed to deserialise FTL command. {}", payload);
68 }
69 } else {
70 error!("Failed to convert buffer to UTF8 string.");
71 }
72
73 buffer.clear();
74 }
75 }
76 b'\r' => continue,
78 byte => buffer.push(byte)
79 }
80 } else {
81 error!("Failed to read anymore bytes from client.");
82 break;
83 }
84 }
85
86 info!("Remote FTL client disconnected.");
87 sender.send(()).await.ok();
88 stream.shutdown(std::net::Shutdown::Both).ok();
89 });
90 }
91
92 Ok(())
93 }
94
95 async fn handler(&self, client: &mut IngestClient, mut writer: &TcpStream, command: FtlCommand) -> Result<(), FtlError> {
96 match command {
97 FtlCommand::HMAC => {
98 debug!("Client requested HMAC payload, sending response.");
99 writer.write(
100 FtlResponse::HMAC {
101 hmac_payload: client.hmac_payload.clone()
102 }
103 .to_string()
104 .as_bytes()
105 )
106 .await
107 .map_err(|_| FtlError::IoError)?;
108
109 Ok(())
110 }
111 FtlCommand::Connect { channel_id, hashed_hmac_payload } => {
112 debug!("Client is connecting, attempting to stream to {}.", &channel_id);
113 let known_key = self.get_stream_key(&channel_id)
114 .await.map_err(|_| FtlError::InvalidStreamKey)?;
115
116 let client_hash = hex::decode(hashed_hmac_payload[1..].to_string())
118 .map_err(|_| FtlError::DecodeError)?;
119
120 let key = ring::hmac::Key::new(ring::hmac::HMAC_SHA512, &known_key.as_bytes());
121
122 ring::hmac::verify(
123 &key,
124 &hex::decode(client.hmac_payload.clone().into_bytes())
125 .map_err(|_| FtlError::DecodeError)?,
126 &client_hash.as_slice()
127 ).map_err(|_| FtlError::RingError)?;
128
129 debug!("Client was verified, ready to stream to {}.", &channel_id);
130 client.channel_id = Some(channel_id);
131
132 writer.write(
133 FtlResponse::Success
134 .to_string()
135 .as_bytes()
136 )
137 .await
138 .map_err(|_| FtlError::IoError)?;
139
140 Ok(())
141 }
142 FtlCommand::Attribute { key, value } => client.handshake.insert(key, value),
143 FtlCommand::Dot => {
144 if let Some(channel_id) = &client.channel_id {
145 let handshake = client.handshake.clone().finalise()?;
146 let udp_port = self.allocate_ingest(channel_id, handshake, client.stop_signal.clone())
147 .await.map_err(|_| FtlError::AllocateError)?;
148
149 debug!("Client is about to begin stream. Allocated port {}.", udp_port);
150 writer.write(
151 FtlResponse::Connect { udp_port }
152 .to_string()
153 .as_bytes()
154 )
155 .await
156 .map_err(|_| FtlError::IoError)?;
157
158 Ok(())
159 } else {
160 Err(FtlError::InvalidStreamKey)
161 }
162 }
163 FtlCommand::Ping { channel_id } => {
164 trace!("Client sent ping. {}", &channel_id);
165 writer.write(
166 FtlResponse::Pong
167 .to_string()
168 .as_bytes()
169 )
170 .await
171 .map_err(|_| FtlError::IoError)?;
172
173 Ok(())
174 }
175 FtlCommand::Disconnect => Err(FtlError::Disconnect)
176 }
177 }
178
179 async fn get_stream_key(&self, channel_id: &str) -> Result<String, ()>;
180 async fn allocate_ingest(&self, channel_id: &str, handshake: FtlHandshakeFinalised, stop_signal: Receiver<()>) -> Result<u16, ()>;
181}