srt_protocol/protocol/pending_connection/
rendezvous.rs

1use std::{cmp::Ordering, io::ErrorKind, net::SocketAddr, time::Instant};
2
3use log::{debug, info};
4
5use ConnectError::*;
6use ConnectionResult::*;
7use RendezvousHsV5::*;
8use RendezvousState::*;
9
10use crate::{
11    connection::{Connection, ConnectionSettings},
12    packet::*,
13    protocol::handshake::Handshake,
14    settings::*,
15};
16
17use super::{
18    cookie::gen_cookie,
19    hsv5::{gen_hsv5_response, start_hsv5_initiation, GenHsv5Result, StartedInitiator},
20    ConnectError, ConnectionReject, ConnectionResult,
21};
22
23pub struct Rendezvous {
24    init_settings: ConnInitSettings,
25    local_addr: SocketAddr,
26    remote_public: SocketAddr,
27    state: RendezvousState,
28    cookie: i32,
29    last_packet: (Packet, SocketAddr),
30    last_send: Option<Instant>,
31    starting_seqnum: SeqNumber,
32}
33
34// see haivision/srt/docs/handshake.md for documentation
35
36#[derive(Debug, Clone)]
37#[allow(clippy::large_enum_variant)]
38enum RendezvousState {
39    Waving,
40    AttentionInitiator(HandshakeVsInfo, StartedInitiator),
41    AttentionResponder(Instant),            // induction time
42    InitiatedResponder(ConnectionSettings), // responders always have the handshake when they transition to initiated
43    InitiatedInitiator(StartedInitiator),
44    FineResponder(ConnectionSettings),
45    FineInitiator(HandshakeVsInfo, StartedInitiator),
46}
47
48impl Rendezvous {
49    pub fn new(
50        local_addr: SocketAddr,
51        remote_public: SocketAddr,
52        init_settings: ConnInitSettings,
53        starting_seqnum: SeqNumber,
54    ) -> Self {
55        let cookie = gen_cookie(&local_addr);
56        let last_packet = (
57            ControlPacket {
58                dest_sockid: SocketId(0),
59                timestamp: TimeStamp::from_micros(0),
60                control_type: ControlTypes::Handshake(HandshakeControlInfo {
61                    init_seq_num: starting_seqnum,
62                    max_packet_size: init_settings.max_packet_size,
63                    max_flow_size: init_settings.max_flow_size,
64                    socket_id: init_settings.local_sockid,
65                    shake_type: ShakeType::Waveahand,
66                    peer_addr: local_addr.ip(),
67                    syn_cookie: cookie, // TODO: !!
68                    info: Rendezvous::empty_flags(),
69                }),
70            }
71            .into(),
72            remote_public,
73        );
74
75        Self {
76            state: Waving,
77            cookie,
78            last_packet,
79            init_settings,
80            local_addr,
81            remote_public,
82            last_send: None,
83            starting_seqnum,
84        }
85    }
86}
87
88#[derive(Debug, Clone)]
89enum RendezvousHsV5 {
90    Initiator,
91    Responder,
92}
93
94fn get_handshake(packet: &Packet) -> Result<&HandshakeControlInfo, ConnectError> {
95    match packet {
96        Packet::Control(ControlPacket {
97            control_type: ControlTypes::Handshake(info),
98            ..
99        }) => Ok(info),
100        Packet::Control(ControlPacket { control_type, .. }) => {
101            Err(HandshakeExpected(control_type.clone()))
102        }
103        Packet::Data(data) => Err(ControlExpected(data.clone())),
104    }
105}
106
107fn extract_ext_info(
108    info: &HandshakeControlInfo,
109) -> Result<Option<&SrtControlPacket>, ConnectError> {
110    match &info.info {
111        HandshakeVsInfo::V5(hs) => Ok(hs.ext_hs.as_ref()),
112        _ => Err(UnsupportedProtocolVersion(4)),
113    }
114}
115
116impl Rendezvous {
117    fn empty_flags() -> HandshakeVsInfo {
118        HandshakeVsInfo::V5(HsV5Info::default())
119    }
120
121    fn transition(&mut self, state: RendezvousState) {
122        debug!(
123            "Rendezvous {:?} transitioning from {:?} to {:?}",
124            self.init_settings.local_sockid, self.state, state,
125        );
126        self.state = state
127    }
128
129    fn gen_packet(&self, shake_type: ShakeType, info: HandshakeVsInfo) -> HandshakeControlInfo {
130        HandshakeControlInfo {
131            init_seq_num: self.starting_seqnum,
132            max_packet_size: self.init_settings.max_packet_size,
133            max_flow_size: self.init_settings.max_flow_size,
134            socket_id: self.init_settings.local_sockid,
135            shake_type,
136            peer_addr: self.local_addr.ip(),
137            syn_cookie: self.cookie, // TODO: !!
138            info,
139        }
140    }
141
142    fn send(&mut self, dest_sockid: SocketId, packet: HandshakeControlInfo) -> ConnectionResult {
143        let pack_pair = (
144            ControlPacket {
145                timestamp: TimeStamp::from_micros(0),
146                dest_sockid,
147                control_type: ControlTypes::Handshake(packet),
148            }
149            .into(),
150            self.remote_public,
151        );
152        self.last_packet = pack_pair.clone();
153        SendPacket(pack_pair)
154    }
155
156    fn send_conclusion(
157        &mut self,
158        dest_sockid: SocketId,
159        info: HandshakeVsInfo,
160    ) -> ConnectionResult {
161        self.send(dest_sockid, self.gen_packet(ShakeType::Conclusion, info))
162    }
163
164    // fn send_agreement(&mut self, dest_sockid: SocketID, info: HandshakeVSInfo) -> ConnectionResult {
165    //     self.send(dest_sockid, self.gen_packet(ShakeType::Agreement, info))
166    // }
167
168    fn make_rejection(
169        &mut self,
170        response_to: &HandshakeControlInfo,
171        timestamp: TimeStamp,
172        r: ConnectionReject,
173    ) -> ConnectionResult {
174        Reject(
175            Some((
176                ControlPacket {
177                    timestamp,
178                    dest_sockid: response_to.socket_id,
179                    control_type: ControlTypes::Handshake(HandshakeControlInfo {
180                        shake_type: ShakeType::Rejection(r.reason()),
181                        socket_id: self.init_settings.local_sockid,
182                        ..response_to.clone()
183                    }),
184                }
185                .into(),
186                self.remote_public,
187            )),
188            r,
189        )
190    }
191
192    fn set_connected(
193        &self,
194        settings: ConnectionSettings,
195        agreement: Option<HandshakeControlInfo>,
196        to_send: Option<HandshakeControlInfo>,
197    ) -> ConnectionResult {
198        Connected(
199            to_send.map(|to_send| {
200                (
201                    ControlPacket {
202                        timestamp: TimeStamp::from_micros(0),
203                        dest_sockid: settings.remote_sockid,
204                        control_type: ControlTypes::Handshake(to_send),
205                    }
206                    .into(),
207                    self.remote_public,
208                )
209            }),
210            Connection {
211                settings,
212                handshake: Handshake::Rendezvous(agreement.map(ControlTypes::Handshake)),
213            },
214        )
215    }
216
217    fn handle_waving(
218        &mut self,
219        info: &HandshakeControlInfo,
220        timestamp: TimeStamp,
221        now: Instant,
222    ) -> ConnectionResult {
223        assert!(matches!(self.state, Waving));
224
225        // NOTE: the cookie comparison behavior is not correctly documented. See haivision/srt#1267
226        let role = match self.cookie.wrapping_sub(info.syn_cookie).cmp(&0) {
227            Ordering::Greater => Initiator,
228            Ordering::Less => Responder,
229            Ordering::Equal => return NotHandled(CookiesMatched(self.cookie)),
230        };
231
232        debug!(
233            "Rendezvous socket {:?} is {:?}",
234            self.init_settings.local_sockid, role
235        );
236
237        match (info.shake_type, role) {
238            (ShakeType::Waveahand, Initiator) => {
239                // NOTE: streamid not supported in rendezvous
240                let (hsv5, initiator) =
241                    start_hsv5_initiation(self.init_settings.clone(), None, now);
242
243                self.transition(AttentionInitiator(hsv5.clone(), initiator));
244
245                self.send_conclusion(info.socket_id, hsv5)
246            }
247            (ShakeType::Waveahand, Responder) => {
248                self.starting_seqnum = info.init_seq_num; // responder, take initiator's seqnum
249                self.transition(AttentionResponder(now));
250                self.send_conclusion(info.socket_id, Rendezvous::empty_flags())
251            }
252            (ShakeType::Conclusion, role) => {
253                let ext_info = match extract_ext_info(info) {
254                    Ok(ei) => ei,
255                    Err(e) => return NotHandled(e),
256                };
257                let hsv5_shake = match (&role, ext_info) {
258                    (Responder, Some(SrtControlPacket::HandshakeRequest(_))) => {
259                        let (hsv5, connection) = match gen_hsv5_response(
260                            &mut self.init_settings,
261                            info,
262                            self.remote_public,
263                            match self.last_send {
264                                Some(induction_time) => induction_time,
265                                None => {
266                                    return ConnectionResult::NotHandled(
267                                        ConnectError::WavehandExpected(info.clone()),
268                                    );
269                                }
270                            },
271                            now,
272                        ) {
273                            GenHsv5Result::Accept(h, c) => (h, c),
274                            GenHsv5Result::NotHandled(e) => return NotHandled(e),
275                            GenHsv5Result::Reject(r) => {
276                                return self.make_rejection(info, timestamp, r)
277                            }
278                        };
279                        self.starting_seqnum = info.init_seq_num; // responder, take initiator's seqnum
280                        self.transition(FineResponder(connection));
281
282                        hsv5
283                    }
284                    (Initiator, None) => {
285                        let (hsv5, initiator) =
286                            start_hsv5_initiation(self.init_settings.clone(), None, now); // NOTE: streamid not supported in rendezvous
287                        self.transition(FineInitiator(hsv5.clone(), initiator));
288                        hsv5
289                    }
290                    (Responder, Some(_)) => {
291                        return NotHandled(ExpectedHsReq);
292                    }
293                    (Initiator, Some(_)) => return NotHandled(ExpectedNoExtFlags),
294                    (Responder, None) => return NotHandled(ExpectedExtFlags),
295                };
296                self.send_conclusion(info.socket_id, hsv5_shake)
297            }
298            (ShakeType::Agreement, _) => NoAction,
299            (ShakeType::Induction, _) => NotHandled(RendezvousExpected(info.clone())),
300            (ShakeType::Rejection(rej), _) => Reject(None, ConnectionReject::Rejected(rej)),
301        }
302    }
303
304    fn handle_attention_initiator(
305        &mut self,
306        info: &HandshakeControlInfo,
307        hsv5: HandshakeVsInfo,
308        initiator: StartedInitiator,
309        now: Instant,
310    ) -> ConnectionResult {
311        match info.shake_type {
312            ShakeType::Conclusion => match extract_ext_info(info) {
313                Ok(Some(SrtControlPacket::HandshakeResponse(_))) => {
314                    let agreement =
315                        self.gen_packet(ShakeType::Agreement, Rendezvous::empty_flags());
316
317                    let settings =
318                        match initiator.finish_hsv5_initiation(info, self.remote_public, now) {
319                            Ok(s) => s,
320                            Err(r) => return NotHandled(r),
321                        };
322
323                    self.set_connected(settings, Some(agreement.clone()), Some(agreement))
324                }
325                Ok(Some(_)) => NotHandled(ExpectedHsResp),
326                Ok(None) => {
327                    self.transition(InitiatedInitiator(initiator));
328                    self.send_conclusion(info.socket_id, hsv5)
329                }
330                Err(e) => NotHandled(e),
331            },
332            _ => NoAction, // todo: errors
333        }
334    }
335
336    fn handle_attention_responder(
337        &mut self,
338        info: &HandshakeControlInfo,
339        timestamp: TimeStamp,
340        induction_time: Instant,
341        now: Instant,
342    ) -> ConnectionResult {
343        match info.shake_type {
344            ShakeType::Conclusion => {
345                match extract_ext_info(info) {
346                    Ok(Some(SrtControlPacket::HandshakeRequest(_))) => {} // ok, continue
347                    Ok(Some(_)) => return NotHandled(ExpectedHsReq),
348                    Ok(None) => return NotHandled(ExpectedExtFlags),
349                    Err(e) => return NotHandled(e),
350                };
351                let (hsv5, connection) = match gen_hsv5_response(
352                    &mut self.init_settings,
353                    info,
354                    self.remote_public,
355                    induction_time,
356                    now,
357                ) {
358                    GenHsv5Result::Accept(h, c) => (h, c),
359                    GenHsv5Result::NotHandled(e) => return NotHandled(e),
360                    GenHsv5Result::Reject(r) => return self.make_rejection(info, timestamp, r),
361                };
362                self.starting_seqnum = info.init_seq_num; // responder, take initiator's seqnum
363                self.transition(InitiatedResponder(connection));
364
365                self.send_conclusion(info.socket_id, hsv5)
366            }
367            _ => NoAction,
368        }
369    }
370
371    fn handle_fine_initiator(
372        &mut self,
373        info: &HandshakeControlInfo,
374        hsv5: HandshakeVsInfo,
375        initiator: StartedInitiator,
376        now: Instant,
377    ) -> ConnectionResult {
378        match info.shake_type {
379            ShakeType::Conclusion => match extract_ext_info(info) {
380                Ok(Some(SrtControlPacket::HandshakeResponse(_))) => {
381                    let agreement = self.gen_packet(ShakeType::Agreement, hsv5);
382
383                    let settings =
384                        match initiator.finish_hsv5_initiation(info, self.remote_public, now) {
385                            Ok(s) => s,
386                            Err(r) => return NotHandled(r),
387                        };
388
389                    self.set_connected(settings, Some(agreement.clone()), Some(agreement))
390                }
391                Ok(Some(_)) => NotHandled(ExpectedHsResp),
392                Ok(None) => NotHandled(ExpectedExtFlags),
393                Err(e) => NotHandled(e),
394            },
395            _ => NoAction, // real errors here
396        }
397    }
398
399    fn handle_fine_responder(
400        &mut self,
401        packet: &Packet,
402        connection: ConnectionSettings,
403    ) -> ConnectionResult {
404        match packet {
405            Packet::Data(_)
406            | Packet::Control(ControlPacket {
407                control_type:
408                    ControlTypes::Handshake(HandshakeControlInfo {
409                        shake_type: ShakeType::Agreement,
410                        ..
411                    }),
412                ..
413            })
414            | Packet::Control(ControlPacket {
415                control_type: ControlTypes::KeepAlive,
416                ..
417            }) => return self.set_connected(connection, None, None),
418            _ => {}
419        }
420        NoAction
421    }
422
423    fn handle_initiated_initiator(
424        &mut self,
425        info: &HandshakeControlInfo,
426        initiator: StartedInitiator,
427        now: Instant,
428    ) -> ConnectionResult {
429        match info.shake_type {
430            ShakeType::Conclusion => match extract_ext_info(info) {
431                Ok(Some(SrtControlPacket::HandshakeResponse(_))) => {
432                    let connection =
433                        match initiator.finish_hsv5_initiation(info, self.remote_public, now) {
434                            Ok(c) => c,
435                            Err(e) => return NotHandled(e),
436                        };
437
438                    let agreement =
439                        self.gen_packet(ShakeType::Agreement, Rendezvous::empty_flags());
440
441                    // TODO: not sure if this should return Some for agreement. A test was failing without it but check spec
442                    self.set_connected(connection, Some(agreement.clone()), Some(agreement))
443                }
444                Ok(Some(_)) => NotHandled(ExpectedHsResp),
445                Ok(None) => NotHandled(ExpectedExtFlags), // spec says stay in this state
446                Err(e) => NotHandled(e),
447            },
448            _ => NoAction, // real errors here
449        }
450    }
451
452    fn handle_initiated_responder(
453        &mut self,
454        packet: &Packet,
455        connection: ConnectionSettings,
456    ) -> ConnectionResult {
457        // if the shake still has flags, respond with flags and don't finish.
458        if let Ok(info) = get_handshake(packet) {
459            match (info.shake_type, extract_ext_info(info)) {
460                (_, Err(e)) => return NotHandled(e),
461                (ShakeType::Conclusion, Ok(Some(SrtControlPacket::HandshakeRequest(_)))) => {
462                    return NoAction; // TODO: this is a pretty roundabout way to do this...just waits for another tick
463                }
464                (ShakeType::Conclusion, Ok(Some(_))) => return NotHandled(ExpectedHsReq),
465                (ShakeType::Waveahand, _) => return NotHandled(AgreementExpected(info.clone())),
466                _ => {}
467            }
468        }
469
470        self.set_connected(
471            connection,
472            None,
473            Some(self.gen_packet(ShakeType::Agreement, Rendezvous::empty_flags())),
474        )
475    }
476
477    pub fn handle_packet(&mut self, packet: ReceivePacketResult, now: Instant) -> ConnectionResult {
478        use ReceivePacketError::*;
479        match packet {
480            Ok((packet, from)) => {
481                if from != self.remote_public {
482                    return NotHandled(UnexpectedHost(self.remote_public, from));
483                }
484
485                let hs = get_handshake(&packet);
486                match (self.state.clone(), hs) {
487                    (Waving, Ok(hs)) => self.handle_waving(hs, packet.timestamp(), now),
488                    (AttentionInitiator(hsv5, initiator), Ok(hs)) => {
489                        self.handle_attention_initiator(hs, hsv5, initiator, now)
490                    }
491                    (AttentionResponder(induction_time), Ok(hs)) => {
492                        self.handle_attention_responder(hs, packet.timestamp(), induction_time, now)
493                    }
494                    (InitiatedInitiator(initiator), Ok(hs)) => {
495                        self.handle_initiated_initiator(hs, initiator, now)
496                    }
497                    (InitiatedResponder(connection), _) => {
498                        self.handle_initiated_responder(&packet, connection)
499                    }
500                    (FineInitiator(hsv5, initiator), Ok(hs)) => {
501                        self.handle_fine_initiator(hs, hsv5, initiator, now)
502                    }
503                    (FineResponder(conn), _) => self.handle_fine_responder(&packet, conn),
504                    (_, Err(e)) => NotHandled(e),
505                }
506            }
507            Err(Io(error)) if error.kind() == ErrorKind::ConnectionReset => {
508                info!(
509                    "ConnectionReset received, rendezvous peer may not have opened the port yet..."
510                );
511                NoAction
512            }
513            Err(Io(error)) => Failure(error),
514            Err(Parse(e)) => NotHandled(ConnectError::ParseFailed(e)),
515        }
516    }
517
518    pub fn handle_tick(&mut self, now: Instant) -> ConnectionResult {
519        self.last_send = Some(now);
520        SendPacket(self.last_packet.clone())
521    }
522}