tsproto 0.2.0

An implementation of the TeamSpeak3 protocol as a library for use in clients and bots.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
use std::collections::VecDeque;
use std::io;
use std::mem;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::u16;

use futures::prelude::*;
use generic_array::typenum::consts::U16;
use generic_array::GenericArray;
use num_traits::ToPrimitive;
use slog::{o, Logger};
use tokio::io::ReadBuf;
use tokio::net::UdpSocket;
use tsproto_packets::packets::*;
use tsproto_types::crypto::EccKeyPubP256;

use crate::packet_codec::PacketCodec;
use crate::resend::{PacketId, PartialPacketId, Resender, ResenderState};
use crate::{Error, Result, MAX_UDP_PACKET_LENGTH, UDP_SINK_CAPACITY};

/// The needed functions, this can be used to abstract from the underlying
/// transport and allows simulation.
pub trait Socket {
	fn poll_recv_from(&self, cx: &mut Context, buf: &mut ReadBuf) -> Poll<io::Result<SocketAddr>>;
	fn poll_send_to(
		&self, cx: &mut Context, buf: &[u8], target: SocketAddr,
	) -> Poll<io::Result<usize>>;
	fn local_addr(&self) -> io::Result<SocketAddr>;
}

/// A cache for the key and nonce for a generation id.
/// This has to be stored for each packet type.
#[derive(Debug)]
pub struct CachedKey {
	pub generation_id: u32,
	pub key: GenericArray<u8, U16>,
	pub nonce: GenericArray<u8, U16>,
}

/// Data that has to be stored for a connection when it is connected.
pub struct ConnectedParams {
	/// The client id of this connection.
	pub c_id: u16,
	/// If voice packets should be encrypted
	pub voice_encryption: bool,

	/// The public key of the other side.
	pub public_key: EccKeyPubP256,
	/// The iv used to encrypt and decrypt packets.
	pub shared_iv: [u8; 64],
	/// The mac used for unencrypted packets.
	pub shared_mac: [u8; 8],
	/// Cached key and nonce per packet type and for server to client (without
	/// client id inside the packet) and client to server communication.
	pub key_cache: [[CachedKey; 2]; 8],
}

/// An event that originates from a tsproto raw connection.
#[derive(Debug)]
pub enum Event<'a> {
	ReceiveUdpPacket(&'a InUdpPacket<'a>),
	ReceivePacket(&'a InPacket<'a>),
	SendUdpPacket(&'a OutUdpPacket),
	SendPacket(&'a OutPacket),
}

/// An item that originates from a tsproto raw event stream.
///
/// The disconnected event is signaled by returning `None` from the stream.
#[derive(Debug)]
pub enum StreamItem {
	Command(InCommandBuf),
	Audio(InAudioBuf),
	C2SInit(InC2SInitBuf),
	S2CInit(InS2CInitBuf),
	/// All packets with an id less or equal to this id were acknowledged.
	AckPacket(PacketId),
	/// The network statistics were updated.
	NetworkStatsUpdated,
	Error(Error),
}

type EventListener = Box<dyn for<'a> Fn(&'a Event<'a>) + Send>;

/// Represents a currently alive connection.
pub struct Connection {
	pub is_client: bool,
	pub logger: Logger,
	/// The parameters of this connection, if it is already established.
	pub params: Option<ConnectedParams>,
	/// The adress of the other side, where packets are coming from and going
	/// to.
	pub address: SocketAddr,

	pub resender: Resender,
	pub codec: PacketCodec,
	pub udp_socket: Box<dyn Socket + Send>,
	udp_buffer: Vec<u8>,

	/// A buffer of packets that should be returned from the stream.
	///
	/// If a new udp packet is received and we already received the following
	/// ids, we can get multiple packets back at once. As we can only return one
	/// from the stream, the rest is stored here.
	pub(crate) stream_items: VecDeque<StreamItem>,

	/// The queue of non-command packets that should be sent.
	///
	/// These packets are not influenced by congestion control.
	/// If it gets too long, we don't poll from the `udp_socket` anymore.
	acks_to_send: VecDeque<OutUdpPacket>,

	pub event_listeners: Vec<EventListener>,
}

impl Socket for UdpSocket {
	fn poll_recv_from(&self, cx: &mut Context, buf: &mut ReadBuf) -> Poll<io::Result<SocketAddr>> {
		self.poll_recv_from(cx, buf)
	}

	fn poll_send_to(
		&self, cx: &mut Context, buf: &[u8], target: SocketAddr,
	) -> Poll<io::Result<usize>> {
		self.poll_send_to(cx, buf, target)
	}

	fn local_addr(&self) -> io::Result<SocketAddr> { self.local_addr() }
}

impl Default for CachedKey {
	fn default() -> Self {
		CachedKey { generation_id: u32::max_value(), key: [0; 16].into(), nonce: [0; 16].into() }
	}
}

impl ConnectedParams {
	/// Fills the parameters for a connection with their default state.
	pub fn new(public_key: EccKeyPubP256, shared_iv: [u8; 64], shared_mac: [u8; 8]) -> Self {
		Self {
			c_id: 0,
			voice_encryption: true,
			public_key,
			shared_iv,
			shared_mac,
			key_cache: Default::default(),
		}
	}
}

impl Connection {
	pub fn new(
		is_client: bool, logger: Logger, address: SocketAddr, udp_socket: Box<dyn Socket + Send>,
	) -> Self {
		let logger = logger.new(o!("local_addr" => udp_socket.local_addr().unwrap().to_string(),
				"remote_addr" => address.to_string()));

		let mut res = Self {
			is_client,
			logger,
			params: None,
			address,
			resender: Default::default(),
			codec: Default::default(),
			udp_socket,
			udp_buffer: Default::default(),

			stream_items: Default::default(),
			acks_to_send: Default::default(),
			event_listeners: Default::default(),
		};
		if is_client {
			// The first command is sent as part of the C2SInit::Init4 packet
			// so it does not get registered automatically.
			res.codec.outgoing_p_ids[PacketType::Command.to_usize().unwrap()] =
				PartialPacketId { generation_id: 0, packet_id: 1 };
		} else {
			res.codec.incoming_p_ids[PacketType::Command.to_usize().unwrap()] =
				PartialPacketId { generation_id: 0, packet_id: 1 };
		}
		res
	}

	/// Check if a given id is in the receive window.
	///
	/// Returns
	/// 1. If the packet id is inside the receive window
	/// 1. The generation of the packet
	/// 1. The minimum accepted packet id
	/// 1. The maximum accepted packet id
	pub(crate) fn in_receive_window(&self, p_type: PacketType, p_id: u16) -> (bool, u32, u16, u16) {
		if p_type == PacketType::Init {
			return (true, 0, 0, 0);
		}
		let type_i = p_type.to_usize().unwrap();
		// Receive window is the next half of ids
		let cur_next = self.codec.incoming_p_ids[type_i].packet_id;
		let (limit, next_gen) = cur_next.overflowing_add(u16::MAX / 2);
		let gen = self.codec.incoming_p_ids[type_i].generation_id;
		let in_recv_win = (!next_gen && p_id >= cur_next && p_id < limit)
			|| (next_gen && (p_id >= cur_next || p_id < limit));
		let gen_id = if in_recv_win {
			if next_gen && p_id < limit { gen + 1 } else { gen }
		} else if p_id < cur_next {
			gen
		} else {
			gen - 1
		};

		(in_recv_win, gen_id, cur_next, limit)
	}

	pub fn send_event(&self, event: &Event) {
		for l in &self.event_listeners {
			l(event)
		}
	}

	pub fn hand_back_buffer(&mut self, buffer: Vec<u8>) {
		if self.udp_buffer.capacity() < MAX_UDP_PACKET_LENGTH
			&& buffer.capacity() >= MAX_UDP_PACKET_LENGTH
		{
			self.udp_buffer = buffer;
		}
	}

	fn poll_send_acks(&mut self, cx: &mut Context) -> Result<()> {
		// Poll acks_to_send
		while let Some(packet) = self.acks_to_send.front() {
			match self.poll_send_udp_packet(cx, packet) {
				Poll::Ready(Ok(())) => {
					self.resender.handle_loss_outgoing(packet);
				}
				Poll::Ready(Err(e)) => return Err(e),
				Poll::Pending => break,
			}
			self.acks_to_send.pop_front();
		}
		Ok(())
	}

	fn poll_incoming_udp_packet(&mut self, cx: &mut Context) -> Poll<Result<StreamItem>> {
		if self.acks_to_send.len() >= UDP_SINK_CAPACITY {
			return Poll::Pending;
		}

		loop {
			// Poll udp_socket
			if self.udp_buffer.len() != MAX_UDP_PACKET_LENGTH {
				self.udp_buffer.resize(MAX_UDP_PACKET_LENGTH, 0);
			}

			let mut read_buf = ReadBuf::new(&mut self.udp_buffer);
			match self.udp_socket.poll_recv_from(cx, &mut &mut read_buf) {
				Poll::Ready(Ok(addr)) => {
					let size = read_buf.filled().len();
					let mut udp_buffer = mem::take(&mut self.udp_buffer);
					udp_buffer.truncate(size);
					match self.handle_udp_packet(cx, udp_buffer, addr) {
						Ok(()) => {
							if let Some(item) = self.stream_items.pop_front() {
								return Poll::Ready(Ok(item));
							}
						}
						Err(e) => {
							return Poll::Ready(Err(e));
						}
					}
				}
				// Udp socket closed
				Poll::Ready(Err(e)) => return Poll::Ready(Err(Error::Network(e))),
				Poll::Pending => return Poll::Pending,
			}
		}
	}

	fn handle_udp_packet(
		&mut self, cx: &mut Context, udp_buffer: Vec<u8>, addr: SocketAddr,
	) -> Result<()> {
		if addr != self.address {
			self.stream_items.push_back(StreamItem::Error(Error::WrongAddress));
			return Ok(());
		}

		let dir = if self.is_client { Direction::S2C } else { Direction::C2S };
		let packet = InUdpPacket(match InPacket::try_new(dir, &udp_buffer) {
			Ok(r) => r,
			Err(e) => {
				self.stream_items.push_back(StreamItem::Error(Error::PacketParse("udp", e)));
				return Ok(());
			}
		});
		let event = Event::ReceiveUdpPacket(&packet);
		self.send_event(&event);

		self.resender.received_packet();
		PacketCodec::handle_udp_packet(self, cx, udp_buffer)?;

		Ok(())
	}

	/// Try to send an ack packet.
	///
	/// If it does not work, add it to the ack queue.
	pub(crate) fn send_ack_packet(&mut self, cx: &mut Context, packet: OutPacket) -> Result<()> {
		self.send_event(&Event::SendPacket(&packet));
		let mut udp_packets = PacketCodec::encode_packet(self, packet)?;
		assert_eq!(
			udp_packets.len(),
			1,
			"Encoding an ack packet should only yield a single packet"
		);
		let packet = udp_packets.pop().unwrap();

		match self.poll_send_udp_packet(cx, &packet) {
			Poll::Ready(r) => {
				if r.is_ok() {
					self.resender.handle_loss_outgoing(&packet);
				}
				r
			}
			Poll::Pending => {
				self.acks_to_send.push_back(packet);
				Ok(())
			}
		}
	}

	/// Add a packet to the send queue.
	///
	/// This function buffers indefinitely, to prevent using a large amount of
	/// memory, check `is_send_queue_full` first and only send a packet if this
	/// function returns `false`.
	///
	/// When the `PacketId` which is returned by this function is acknowledged,
	/// the packet was successfully received by the other side of the
	/// connection.
	pub fn send_packet(&mut self, packet: OutPacket) -> Result<PacketId> {
		self.send_event(&Event::SendPacket(&packet));
		let udp_packets = PacketCodec::encode_packet(self, packet)?;

		let id = udp_packets.last().unwrap().into();
		for p in udp_packets {
			self.send_udp_packet(p);
		}
		Ok(id)
	}

	/// Add an udp packet to the send queue.
	pub fn send_udp_packet(&mut self, packet: OutUdpPacket) {
		match packet.packet_type() {
			PacketType::Init | PacketType::Command | PacketType::CommandLow => {
				Resender::send_packet(self, packet);
			}
			_ => self.acks_to_send.push_back(packet),
		}
	}

	pub fn poll_send_udp_packet(
		&self, cx: &mut Context, packet: &OutUdpPacket,
	) -> Poll<Result<()>> {
		Self::static_poll_send_udp_packet(
			&*self.udp_socket,
			self.address,
			&self.event_listeners,
			cx,
			packet,
		)
	}

	/// Remember to add the size of the sent packet to the stats in the resender.
	pub fn static_poll_send_udp_packet(
		udp_socket: &dyn Socket, address: SocketAddr, event_listeners: &[EventListener],
		cx: &mut Context, packet: &OutUdpPacket,
	) -> Poll<Result<()>> {
		let data = packet.data().data();
		match udp_socket.poll_send_to(cx, data, address).map_err(Error::Network)? {
			Poll::Pending => Poll::Pending,
			Poll::Ready(size) => {
				let event = Event::SendUdpPacket(&packet);
				for l in event_listeners {
					l(&event)
				}

				if size != data.len() {
					Poll::Ready(Err(Error::Network(std::io::Error::new(
						std::io::ErrorKind::Other,
						"Failed to send whole udp packet",
					))))
				} else {
					Poll::Ready(Ok(()))
				}
			}
		}
	}

	pub fn is_send_queue_full(&self) -> bool { self.resender.is_full() }
	pub fn is_send_queue_empty(&self) -> bool { self.resender.is_empty() }
}

/// Pull for events.
///
/// `Ok(StreamItem::Error)` is recoverable, `Err()` is not.
///
/// Polling does a few things in round robin fashion:
/// 1. Check for new udp packets
/// 2. Use the resender to resend packets if necessary
/// 3. Use the resender to send ping packets if necessary
impl Stream for Connection {
	type Item = Result<StreamItem>;
	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
		if let Err(e) = self.poll_send_acks(cx) {
			return Poll::Ready(Some(Err(e)));
		}

		if self.resender.get_state() == ResenderState::Disconnected {
			// Send all ack packets and return `None` afterwards
			if self.acks_to_send.is_empty() {
				return Poll::Ready(None);
			}
		}

		// Use the resender to resend packes
		match Resender::poll_resend(&mut *self, cx) {
			Ok(()) => {}
			Err(e) => return Poll::Ready(Some(Err(e))),
		}

		// Use the resender to send pings
		match Resender::poll_ping(&mut *self, cx) {
			Ok(()) => {}
			Err(e) => return Poll::Ready(Some(Err(e))),
		}

		// Return existing stream_items
		if let Some(item) = self.stream_items.pop_front() {
			return Poll::Ready(Some(Ok(item)));
		}

		// Check for new udp packets
		match self.poll_incoming_udp_packet(cx) {
			Poll::Ready(r) => return Poll::Ready(Some(r)),
			Poll::Pending => {}
		}

		Poll::Pending
	}
}