1use std::{cmp::Ordering, io::ErrorKind, net::SocketAddr, time::Instant};
2
3use log::{debug, info};
4
5use ConnectError::*;
6use ConnectionResult::*;
7use RendezvousHsV5::*;
8use RendezvousState::*;
9
10use crate::{
11 connection::{Connection, ConnectionSettings},
12 packet::*,
13 protocol::handshake::Handshake,
14 settings::*,
15};
16
17use super::{
18 cookie::gen_cookie,
19 hsv5::{gen_hsv5_response, start_hsv5_initiation, GenHsv5Result, StartedInitiator},
20 ConnectError, ConnectionReject, ConnectionResult,
21};
22
23pub struct Rendezvous {
24 init_settings: ConnInitSettings,
25 local_addr: SocketAddr,
26 remote_public: SocketAddr,
27 state: RendezvousState,
28 cookie: i32,
29 last_packet: (Packet, SocketAddr),
30 last_send: Option<Instant>,
31 starting_seqnum: SeqNumber,
32}
33
34#[derive(Debug, Clone)]
37#[allow(clippy::large_enum_variant)]
38enum RendezvousState {
39 Waving,
40 AttentionInitiator(HandshakeVsInfo, StartedInitiator),
41 AttentionResponder(Instant), InitiatedResponder(ConnectionSettings), InitiatedInitiator(StartedInitiator),
44 FineResponder(ConnectionSettings),
45 FineInitiator(HandshakeVsInfo, StartedInitiator),
46}
47
48impl Rendezvous {
49 pub fn new(
50 local_addr: SocketAddr,
51 remote_public: SocketAddr,
52 init_settings: ConnInitSettings,
53 starting_seqnum: SeqNumber,
54 ) -> Self {
55 let cookie = gen_cookie(&local_addr);
56 let last_packet = (
57 ControlPacket {
58 dest_sockid: SocketId(0),
59 timestamp: TimeStamp::from_micros(0),
60 control_type: ControlTypes::Handshake(HandshakeControlInfo {
61 init_seq_num: starting_seqnum,
62 max_packet_size: init_settings.max_packet_size,
63 max_flow_size: init_settings.max_flow_size,
64 socket_id: init_settings.local_sockid,
65 shake_type: ShakeType::Waveahand,
66 peer_addr: local_addr.ip(),
67 syn_cookie: cookie, info: Rendezvous::empty_flags(),
69 }),
70 }
71 .into(),
72 remote_public,
73 );
74
75 Self {
76 state: Waving,
77 cookie,
78 last_packet,
79 init_settings,
80 local_addr,
81 remote_public,
82 last_send: None,
83 starting_seqnum,
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
89enum RendezvousHsV5 {
90 Initiator,
91 Responder,
92}
93
94fn get_handshake(packet: &Packet) -> Result<&HandshakeControlInfo, ConnectError> {
95 match packet {
96 Packet::Control(ControlPacket {
97 control_type: ControlTypes::Handshake(info),
98 ..
99 }) => Ok(info),
100 Packet::Control(ControlPacket { control_type, .. }) => {
101 Err(HandshakeExpected(control_type.clone()))
102 }
103 Packet::Data(data) => Err(ControlExpected(data.clone())),
104 }
105}
106
107fn extract_ext_info(
108 info: &HandshakeControlInfo,
109) -> Result<Option<&SrtControlPacket>, ConnectError> {
110 match &info.info {
111 HandshakeVsInfo::V5(hs) => Ok(hs.ext_hs.as_ref()),
112 _ => Err(UnsupportedProtocolVersion(4)),
113 }
114}
115
116impl Rendezvous {
117 fn empty_flags() -> HandshakeVsInfo {
118 HandshakeVsInfo::V5(HsV5Info::default())
119 }
120
121 fn transition(&mut self, state: RendezvousState) {
122 debug!(
123 "Rendezvous {:?} transitioning from {:?} to {:?}",
124 self.init_settings.local_sockid, self.state, state,
125 );
126 self.state = state
127 }
128
129 fn gen_packet(&self, shake_type: ShakeType, info: HandshakeVsInfo) -> HandshakeControlInfo {
130 HandshakeControlInfo {
131 init_seq_num: self.starting_seqnum,
132 max_packet_size: self.init_settings.max_packet_size,
133 max_flow_size: self.init_settings.max_flow_size,
134 socket_id: self.init_settings.local_sockid,
135 shake_type,
136 peer_addr: self.local_addr.ip(),
137 syn_cookie: self.cookie, info,
139 }
140 }
141
142 fn send(&mut self, dest_sockid: SocketId, packet: HandshakeControlInfo) -> ConnectionResult {
143 let pack_pair = (
144 ControlPacket {
145 timestamp: TimeStamp::from_micros(0),
146 dest_sockid,
147 control_type: ControlTypes::Handshake(packet),
148 }
149 .into(),
150 self.remote_public,
151 );
152 self.last_packet = pack_pair.clone();
153 SendPacket(pack_pair)
154 }
155
156 fn send_conclusion(
157 &mut self,
158 dest_sockid: SocketId,
159 info: HandshakeVsInfo,
160 ) -> ConnectionResult {
161 self.send(dest_sockid, self.gen_packet(ShakeType::Conclusion, info))
162 }
163
164 fn make_rejection(
169 &mut self,
170 response_to: &HandshakeControlInfo,
171 timestamp: TimeStamp,
172 r: ConnectionReject,
173 ) -> ConnectionResult {
174 Reject(
175 Some((
176 ControlPacket {
177 timestamp,
178 dest_sockid: response_to.socket_id,
179 control_type: ControlTypes::Handshake(HandshakeControlInfo {
180 shake_type: ShakeType::Rejection(r.reason()),
181 socket_id: self.init_settings.local_sockid,
182 ..response_to.clone()
183 }),
184 }
185 .into(),
186 self.remote_public,
187 )),
188 r,
189 )
190 }
191
192 fn set_connected(
193 &self,
194 settings: ConnectionSettings,
195 agreement: Option<HandshakeControlInfo>,
196 to_send: Option<HandshakeControlInfo>,
197 ) -> ConnectionResult {
198 Connected(
199 to_send.map(|to_send| {
200 (
201 ControlPacket {
202 timestamp: TimeStamp::from_micros(0),
203 dest_sockid: settings.remote_sockid,
204 control_type: ControlTypes::Handshake(to_send),
205 }
206 .into(),
207 self.remote_public,
208 )
209 }),
210 Connection {
211 settings,
212 handshake: Handshake::Rendezvous(agreement.map(ControlTypes::Handshake)),
213 },
214 )
215 }
216
217 fn handle_waving(
218 &mut self,
219 info: &HandshakeControlInfo,
220 timestamp: TimeStamp,
221 now: Instant,
222 ) -> ConnectionResult {
223 assert!(matches!(self.state, Waving));
224
225 let role = match self.cookie.wrapping_sub(info.syn_cookie).cmp(&0) {
227 Ordering::Greater => Initiator,
228 Ordering::Less => Responder,
229 Ordering::Equal => return NotHandled(CookiesMatched(self.cookie)),
230 };
231
232 debug!(
233 "Rendezvous socket {:?} is {:?}",
234 self.init_settings.local_sockid, role
235 );
236
237 match (info.shake_type, role) {
238 (ShakeType::Waveahand, Initiator) => {
239 let (hsv5, initiator) =
241 start_hsv5_initiation(self.init_settings.clone(), None, now);
242
243 self.transition(AttentionInitiator(hsv5.clone(), initiator));
244
245 self.send_conclusion(info.socket_id, hsv5)
246 }
247 (ShakeType::Waveahand, Responder) => {
248 self.starting_seqnum = info.init_seq_num; self.transition(AttentionResponder(now));
250 self.send_conclusion(info.socket_id, Rendezvous::empty_flags())
251 }
252 (ShakeType::Conclusion, role) => {
253 let ext_info = match extract_ext_info(info) {
254 Ok(ei) => ei,
255 Err(e) => return NotHandled(e),
256 };
257 let hsv5_shake = match (&role, ext_info) {
258 (Responder, Some(SrtControlPacket::HandshakeRequest(_))) => {
259 let (hsv5, connection) = match gen_hsv5_response(
260 &mut self.init_settings,
261 info,
262 self.remote_public,
263 match self.last_send {
264 Some(induction_time) => induction_time,
265 None => {
266 return ConnectionResult::NotHandled(
267 ConnectError::WavehandExpected(info.clone()),
268 );
269 }
270 },
271 now,
272 ) {
273 GenHsv5Result::Accept(h, c) => (h, c),
274 GenHsv5Result::NotHandled(e) => return NotHandled(e),
275 GenHsv5Result::Reject(r) => {
276 return self.make_rejection(info, timestamp, r)
277 }
278 };
279 self.starting_seqnum = info.init_seq_num; self.transition(FineResponder(connection));
281
282 hsv5
283 }
284 (Initiator, None) => {
285 let (hsv5, initiator) =
286 start_hsv5_initiation(self.init_settings.clone(), None, now); self.transition(FineInitiator(hsv5.clone(), initiator));
288 hsv5
289 }
290 (Responder, Some(_)) => {
291 return NotHandled(ExpectedHsReq);
292 }
293 (Initiator, Some(_)) => return NotHandled(ExpectedNoExtFlags),
294 (Responder, None) => return NotHandled(ExpectedExtFlags),
295 };
296 self.send_conclusion(info.socket_id, hsv5_shake)
297 }
298 (ShakeType::Agreement, _) => NoAction,
299 (ShakeType::Induction, _) => NotHandled(RendezvousExpected(info.clone())),
300 (ShakeType::Rejection(rej), _) => Reject(None, ConnectionReject::Rejected(rej)),
301 }
302 }
303
304 fn handle_attention_initiator(
305 &mut self,
306 info: &HandshakeControlInfo,
307 hsv5: HandshakeVsInfo,
308 initiator: StartedInitiator,
309 now: Instant,
310 ) -> ConnectionResult {
311 match info.shake_type {
312 ShakeType::Conclusion => match extract_ext_info(info) {
313 Ok(Some(SrtControlPacket::HandshakeResponse(_))) => {
314 let agreement =
315 self.gen_packet(ShakeType::Agreement, Rendezvous::empty_flags());
316
317 let settings =
318 match initiator.finish_hsv5_initiation(info, self.remote_public, now) {
319 Ok(s) => s,
320 Err(r) => return NotHandled(r),
321 };
322
323 self.set_connected(settings, Some(agreement.clone()), Some(agreement))
324 }
325 Ok(Some(_)) => NotHandled(ExpectedHsResp),
326 Ok(None) => {
327 self.transition(InitiatedInitiator(initiator));
328 self.send_conclusion(info.socket_id, hsv5)
329 }
330 Err(e) => NotHandled(e),
331 },
332 _ => NoAction, }
334 }
335
336 fn handle_attention_responder(
337 &mut self,
338 info: &HandshakeControlInfo,
339 timestamp: TimeStamp,
340 induction_time: Instant,
341 now: Instant,
342 ) -> ConnectionResult {
343 match info.shake_type {
344 ShakeType::Conclusion => {
345 match extract_ext_info(info) {
346 Ok(Some(SrtControlPacket::HandshakeRequest(_))) => {} Ok(Some(_)) => return NotHandled(ExpectedHsReq),
348 Ok(None) => return NotHandled(ExpectedExtFlags),
349 Err(e) => return NotHandled(e),
350 };
351 let (hsv5, connection) = match gen_hsv5_response(
352 &mut self.init_settings,
353 info,
354 self.remote_public,
355 induction_time,
356 now,
357 ) {
358 GenHsv5Result::Accept(h, c) => (h, c),
359 GenHsv5Result::NotHandled(e) => return NotHandled(e),
360 GenHsv5Result::Reject(r) => return self.make_rejection(info, timestamp, r),
361 };
362 self.starting_seqnum = info.init_seq_num; self.transition(InitiatedResponder(connection));
364
365 self.send_conclusion(info.socket_id, hsv5)
366 }
367 _ => NoAction,
368 }
369 }
370
371 fn handle_fine_initiator(
372 &mut self,
373 info: &HandshakeControlInfo,
374 hsv5: HandshakeVsInfo,
375 initiator: StartedInitiator,
376 now: Instant,
377 ) -> ConnectionResult {
378 match info.shake_type {
379 ShakeType::Conclusion => match extract_ext_info(info) {
380 Ok(Some(SrtControlPacket::HandshakeResponse(_))) => {
381 let agreement = self.gen_packet(ShakeType::Agreement, hsv5);
382
383 let settings =
384 match initiator.finish_hsv5_initiation(info, self.remote_public, now) {
385 Ok(s) => s,
386 Err(r) => return NotHandled(r),
387 };
388
389 self.set_connected(settings, Some(agreement.clone()), Some(agreement))
390 }
391 Ok(Some(_)) => NotHandled(ExpectedHsResp),
392 Ok(None) => NotHandled(ExpectedExtFlags),
393 Err(e) => NotHandled(e),
394 },
395 _ => NoAction, }
397 }
398
399 fn handle_fine_responder(
400 &mut self,
401 packet: &Packet,
402 connection: ConnectionSettings,
403 ) -> ConnectionResult {
404 match packet {
405 Packet::Data(_)
406 | Packet::Control(ControlPacket {
407 control_type:
408 ControlTypes::Handshake(HandshakeControlInfo {
409 shake_type: ShakeType::Agreement,
410 ..
411 }),
412 ..
413 })
414 | Packet::Control(ControlPacket {
415 control_type: ControlTypes::KeepAlive,
416 ..
417 }) => return self.set_connected(connection, None, None),
418 _ => {}
419 }
420 NoAction
421 }
422
423 fn handle_initiated_initiator(
424 &mut self,
425 info: &HandshakeControlInfo,
426 initiator: StartedInitiator,
427 now: Instant,
428 ) -> ConnectionResult {
429 match info.shake_type {
430 ShakeType::Conclusion => match extract_ext_info(info) {
431 Ok(Some(SrtControlPacket::HandshakeResponse(_))) => {
432 let connection =
433 match initiator.finish_hsv5_initiation(info, self.remote_public, now) {
434 Ok(c) => c,
435 Err(e) => return NotHandled(e),
436 };
437
438 let agreement =
439 self.gen_packet(ShakeType::Agreement, Rendezvous::empty_flags());
440
441 self.set_connected(connection, Some(agreement.clone()), Some(agreement))
443 }
444 Ok(Some(_)) => NotHandled(ExpectedHsResp),
445 Ok(None) => NotHandled(ExpectedExtFlags), Err(e) => NotHandled(e),
447 },
448 _ => NoAction, }
450 }
451
452 fn handle_initiated_responder(
453 &mut self,
454 packet: &Packet,
455 connection: ConnectionSettings,
456 ) -> ConnectionResult {
457 if let Ok(info) = get_handshake(packet) {
459 match (info.shake_type, extract_ext_info(info)) {
460 (_, Err(e)) => return NotHandled(e),
461 (ShakeType::Conclusion, Ok(Some(SrtControlPacket::HandshakeRequest(_)))) => {
462 return NoAction; }
464 (ShakeType::Conclusion, Ok(Some(_))) => return NotHandled(ExpectedHsReq),
465 (ShakeType::Waveahand, _) => return NotHandled(AgreementExpected(info.clone())),
466 _ => {}
467 }
468 }
469
470 self.set_connected(
471 connection,
472 None,
473 Some(self.gen_packet(ShakeType::Agreement, Rendezvous::empty_flags())),
474 )
475 }
476
477 pub fn handle_packet(&mut self, packet: ReceivePacketResult, now: Instant) -> ConnectionResult {
478 use ReceivePacketError::*;
479 match packet {
480 Ok((packet, from)) => {
481 if from != self.remote_public {
482 return NotHandled(UnexpectedHost(self.remote_public, from));
483 }
484
485 let hs = get_handshake(&packet);
486 match (self.state.clone(), hs) {
487 (Waving, Ok(hs)) => self.handle_waving(hs, packet.timestamp(), now),
488 (AttentionInitiator(hsv5, initiator), Ok(hs)) => {
489 self.handle_attention_initiator(hs, hsv5, initiator, now)
490 }
491 (AttentionResponder(induction_time), Ok(hs)) => {
492 self.handle_attention_responder(hs, packet.timestamp(), induction_time, now)
493 }
494 (InitiatedInitiator(initiator), Ok(hs)) => {
495 self.handle_initiated_initiator(hs, initiator, now)
496 }
497 (InitiatedResponder(connection), _) => {
498 self.handle_initiated_responder(&packet, connection)
499 }
500 (FineInitiator(hsv5, initiator), Ok(hs)) => {
501 self.handle_fine_initiator(hs, hsv5, initiator, now)
502 }
503 (FineResponder(conn), _) => self.handle_fine_responder(&packet, conn),
504 (_, Err(e)) => NotHandled(e),
505 }
506 }
507 Err(Io(error)) if error.kind() == ErrorKind::ConnectionReset => {
508 info!(
509 "ConnectionReset received, rendezvous peer may not have opened the port yet..."
510 );
511 NoAction
512 }
513 Err(Io(error)) => Failure(error),
514 Err(Parse(e)) => NotHandled(ConnectError::ParseFailed(e)),
515 }
516 }
517
518 pub fn handle_tick(&mut self, now: Instant) -> ConnectionResult {
519 self.last_send = Some(now);
520 SendPacket(self.last_packet.clone())
521 }
522}