srt_protocol/options/
uri.rs

1use std::{
2    borrow::Cow,
3    convert::{TryFrom, TryInto},
4    net::{AddrParseError, IpAddr},
5    str::FromStr,
6    time::Duration,
7};
8
9use regex::Regex;
10use thiserror::Error;
11use url::{Host, ParseError, Url};
12
13use crate::options::*;
14
15/// see https://github.com/Haivision/srt/blob/master/docs/apps/srt-live-transmit.md#medium-srt
16#[derive(Debug, Clone, Eq, PartialEq)]
17pub struct SrtUri(BindOptions);
18
19#[derive(Error, Debug, Clone, Eq, PartialEq)]
20pub enum SrtUriError {
21    #[error("{0}")]
22    InvalidOptions(#[from] OptionsError),
23    #[error("{0}")]
24    InvalidUrl(#[from] ParseError),
25    #[error("Invalid adapter: {0}")]
26    InvalidAdapter(#[from] AddrParseError),
27    #[error("Invalid mode: {0}")]
28    InvalidMode(String),
29    #[error("Invalid parameter: {0}={1}, expected positive integer")]
30    InvalidIntParameter(&'static str, String),
31    #[error("Unimplemented parameter: {0}")]
32    UnimplementedParameter(&'static str),
33}
34
35pub fn url_parse(s: &str, mode_listener: bool) -> Result<Url, ParseError> {
36    let re = Regex::new(r"([a-z]{3})://:([0-9]*)\??(.*)").unwrap();
37    if re.is_match(s) {
38        let caps = re.captures(s).unwrap();
39        let protocol = caps.get(1).map_or("", |m| m.as_str());
40        let port = caps.get(2).map_or("", |m| m.as_str());
41        let options = caps.get(3).map_or("", |m| m.as_str());
42        let listener = if mode_listener { "mode=listener&" } else { "" };
43        Url::parse(&format!(
44            "{}://0.0.0.0:{}?{}{}",
45            protocol, port, listener, options
46        ))
47    } else {
48        Url::parse(s)
49    }
50}
51
52impl TryFrom<Url> for SrtUri {
53    type Error = SrtUriError;
54
55    fn try_from(url: Url) -> Result<Self, Self::Error> {
56        let (mode, adapter, stream_id, mut socket) = Self::parse_query_pairs(&url)?;
57        let host = Self::parse_host(&url);
58        let port = url.port();
59
60        use SrtUrlMode::*;
61        match (mode, host, port, adapter) {
62            (Unspecified, None, Some(port), None) => {
63                Ok(SrtUri(ListenerOptions::with(port, socket)?.into()))
64            }
65            (Unspecified, None, Some(port), Some(host)) // DEPRECATED
66            | (Listener, Some(host), Some(port), None)
67            | (Listener, Some(_), Some(port), Some(host)) => Ok(SrtUri(
68                ListenerOptions::with(SocketAddress { host, port }, socket)?.into(),
69            )),
70            (Unspecified, Some(host), Some(port), adapter @ None)
71            | (Caller, Some(host), Some(port), adapter) => {
72                if let Some(adapter) = adapter {
73                    let ip = adapter.try_into().unwrap();
74                    socket.connect.local.set_ip(ip);
75                }
76                let remote = SocketAddress { host, port };
77                let stream_id = stream_id.as_ref().map(|s| s.as_ref());
78                Ok(SrtUri(
79                    CallerOptions::with(remote, stream_id, socket)?.into(),
80                ))
81            }
82            (Unspecified, Some(host), Some(port), adapter @ Some(_))
83            | (Rendezvous, Some(host), Some(port), adapter) => {
84                let remote = SocketAddress { host, port };
85                if let Some(adapter) = adapter {
86                    let ip = adapter.try_into().unwrap();
87                    socket.connect.local.set_ip(ip);
88                }
89
90                if socket.connect.local.port() == 0 {
91                    socket.connect.local.set_port(port);
92                }
93                Ok(SrtUri(RendezvousOptions::with(remote, socket)?.into()))
94            }
95            (Caller, Some(_), None, _)
96            | (Rendezvous, Some(_), None, _)
97            | (Unspecified, None, None, _)
98            | (Unspecified, Some(_), None, _)
99            | (Listener, None, _, _)
100            | (Listener, Some(_), None, _)
101            | (Caller, None, _, _)
102            | (Rendezvous, None, _, _) => {
103                unimplemented!();
104            }
105        }
106    }
107}
108
109impl FromStr for SrtUri {
110    type Err = SrtUriError;
111
112    fn from_str(s: &str) -> Result<Self, Self::Err> {
113        url_parse(s, true)?.try_into()
114    }
115}
116
117enum SrtUrlMode {
118    Unspecified,
119    Listener,
120    Caller,
121    Rendezvous,
122}
123
124type QueryPairs<'a> = (
125    SrtUrlMode,
126    Option<SocketHost>,
127    Option<Cow<'a, str>>,
128    SocketOptions,
129);
130
131impl SrtUri {
132    fn parse_query_pairs(url: &Url) -> Result<QueryPairs, SrtUriError> {
133        use SrtUriError::*;
134
135        let mut mode = SrtUrlMode::Unspecified;
136        let mut adapter: Option<SocketHost> = None;
137        let mut stream_id = None;
138        let mut socket = SocketOptions::default();
139
140        let mut inputbw = None;
141        let mut maxbw = None;
142        let mut mininputbw = None;
143        let mut oheadbw = None;
144
145        for (key, value) in url.query_pairs() {
146            match key.as_ref() {
147                "mode" => {
148                    mode = match value.as_ref() {
149                        "listener" => SrtUrlMode::Listener,
150                        "caller" => SrtUrlMode::Caller,
151                        "rendezvous" => SrtUrlMode::Rendezvous,
152                        value => return Err(InvalidMode(value.to_string())),
153                    };
154                }
155                "adapter" => {
156                    adapter = Some(
157                        IpAddr::from_str(value.as_ref())
158                            .map_err(InvalidAdapter)?
159                            .into(),
160                    );
161                }
162                "port" => {
163                    let value = u16::try_from(Self::parse_int_param("port", value)?)
164                        .map_err(|e| SrtUriError::InvalidIntParameter("port", e.to_string()))?;
165                    socket.connect.local.set_port(value);
166                }
167                "conntimeo" => {
168                    let value = Self::parse_int_param("conntimeo", value)?;
169                    socket.connect.timeout = Duration::from_millis(value);
170                }
171                "drifttracer" => unimplemented!(),
172                "enforcedencryption" => unimplemented!(),
173                "fc" => {
174                    let value = Self::parse_int_param("fc", value)?;
175                    socket.sender.flow_control_window_size = PacketCount(value);
176                }
177                "groupconnect" => return Err(UnimplementedParameter("groupconnect")),
178                "groupstabtimeo" => return Err(UnimplementedParameter("groupstabtimeo")),
179                "inputbw" => {
180                    let value = Self::parse_int_param("inputbw", value)?;
181                    if value > 0 {
182                        inputbw = Some(DataRate(value));
183                    }
184                }
185                "iptos" => unimplemented!(),
186                "ipttl" => {
187                    let value = Self::parse_int_param("ipttl", value)?;
188                    if value > 255 {
189                        return Err(SrtUriError::InvalidIntParameter("ipttl", value.to_string()));
190                    }
191                    socket.connect.ip_ttl = value as u8;
192                }
193                "ipv6only" => unimplemented!(),
194                "kmpreannounce" => {
195                    let value = Self::parse_int_param("kmpreannounce", value)?;
196                    socket.encryption.km_refresh.period = PacketCount(value);
197                }
198                "kmrefreshrate" => {
199                    let value = Self::parse_int_param("kmrefreshrate", value)?;
200                    socket.encryption.km_refresh.pre_announcement_period = PacketCount(value);
201                }
202                "latency" => {
203                    let value = Self::parse_int_param("latency", value)?;
204                    let latency = Duration::from_millis(value);
205                    socket.sender.peer_latency = latency;
206                    socket.receiver.latency = latency;
207                }
208                "linger" => {
209                    let value = Self::parse_int_param("linger", value)?;
210                    socket.connect.linger = Some(Duration::from_millis(value));
211                }
212                "lossmaxttl" => {
213                    let value = Self::parse_int_param("lossmaxttl", value)?;
214                    socket.receiver.reorder_tolerance_max = PacketCount(value);
215                }
216                "maxbw" => {
217                    let value = Self::parse_int_param("maxbw", value)?;
218                    if value > 0 {
219                        maxbw = Some(DataRate(value));
220                    }
221                }
222                "mininputbw" => {
223                    let value = Self::parse_int_param("mininputbw", value)?;
224                    if value > 0 {
225                        mininputbw = Some(DataRate(value));
226                    }
227                }
228                "messageapi" => return Err(UnimplementedParameter("messageapi")),
229                "minversion" => {
230                    let digits: Result<Vec<_>, _> =
231                        value.as_ref().split('.').map(u8::from_str).collect();
232                    match digits {
233                        Err(_) => {
234                            return Err(SrtUriError::InvalidIntParameter(
235                                "minversion",
236                                value.to_string(),
237                            ))
238                        }
239                        Ok(digits) if digits.len() != 3 => {
240                            return Err(SrtUriError::InvalidIntParameter(
241                                "minversion",
242                                value.to_string(),
243                            ))
244                        }
245                        Ok(digits) => {
246                            socket.connect.min_version =
247                                SrtVersion::new(digits[0], digits[1], digits[2])
248                        }
249                    }
250                }
251                "mss" => {
252                    let value = Self::parse_int_param("mss", value)?;
253                    socket.session.max_segment_size = PacketSize(value);
254                }
255                "nakreport" => unimplemented!(),
256                "oheadbw" => {
257                    let value = Self::parse_int_param("oheadbw", value)?;
258                    if value > 5 {
259                        oheadbw = Some(Percent(value));
260                    }
261                }
262                "packetfilter" => return Err(UnimplementedParameter("packetfilter")),
263                "passphrase" => {
264                    socket.encryption.passphrase = Some(value.to_string().try_into()?);
265                }
266                "payloadsize" => {
267                    let value = Self::parse_int_param("payloadsize", value)?;
268                    socket.sender.max_payload_size = PacketSize(value);
269                }
270                "pbkeylen" => {
271                    let value = u16::try_from(Self::parse_int_param("pbkeylen", value)?)
272                        .map_err(|e| InvalidIntParameter("pbkeylen", e.to_string()))?;
273                    socket.encryption.key_size = value.try_into()?;
274                }
275                "peeridletimeo" => {
276                    let value = Self::parse_int_param("peeridletimeo", value)?;
277                    socket.session.peer_idle_timeout = Duration::from_millis(value);
278                }
279                "peerlatency" => {
280                    let value = Self::parse_int_param("peerlatency", value)?;
281                    socket.sender.peer_latency = Duration::from_millis(value);
282                }
283                "rcvbuf" => {
284                    let value = Self::parse_int_param("rcvbuf", value)?;
285                    socket.receiver.buffer_size = ByteCount(value);
286                }
287                "rcvlatency" => {
288                    let value = Self::parse_int_param("rcvlatency", value)?;
289                    socket.receiver.latency = Duration::from_millis(value);
290                }
291                "retransmitalgo" => unimplemented!(),
292                "sndbuf" => {
293                    let value = Self::parse_int_param("sndbuf", value)?;
294                    socket.sender.buffer_size = ByteCount(value);
295                }
296                "snddropdelay" => {
297                    let value = Self::parse_int_param("snddropdelay", value)?;
298                    socket.sender.drop_delay = Duration::from_millis(value);
299                }
300                "streamid" => {
301                    stream_id = Some(value);
302                }
303                "tlpktdrop" => unimplemented!(),
304                "transtype" => return Err(UnimplementedParameter("transtype")),
305                "tsbpdmode" => return Err(UnimplementedParameter("tsbpdmode")),
306                _ => {}
307            }
308        }
309
310        socket.sender.bandwidth = match (maxbw, inputbw, oheadbw, mininputbw) {
311            (Some(rate), None, _, _) => LiveBandwidthMode::Max(rate),
312            (None, Some(rate), overhead, _) => LiveBandwidthMode::Input {
313                rate,
314                overhead: overhead.unwrap_or(Percent(5)),
315            },
316            (None, None, overhead, Some(expected)) => LiveBandwidthMode::Estimated {
317                expected,
318                overhead: overhead.unwrap_or(Percent(5)),
319            },
320            _ => LiveBandwidthMode::Unlimited,
321        };
322
323        Ok((mode, adapter, stream_id, socket))
324    }
325
326    fn parse_host(url: &Url) -> Option<SocketHost> {
327        let host = match url.host() {
328            Some(Host::Domain(domain)) => Some(match IpAddr::from_str(domain) {
329                Ok(IpAddr::V4(ip)) => Host::Ipv4(ip),
330                Ok(IpAddr::V6(ip)) => Host::Ipv6(ip),
331                Err(_) => Host::Domain(domain),
332            }),
333            host => host,
334        };
335
336        let host: Option<SocketHost> = match host {
337            Some(Host::Domain("")) | None => None,
338            Some(Host::Ipv4(ip)) => Some(ip.into()),
339            Some(Host::Ipv6(ip)) => Some(ip.into()),
340            Some(Host::Domain(domain)) => Some(SocketHost::Domain(domain.to_string())),
341        };
342        host
343    }
344
345    fn parse_int_param(key: &'static str, value: Cow<str>) -> Result<u64, SrtUriError> {
346        match value.as_ref().parse() {
347            Ok(0) | Err(_) => Err(SrtUriError::InvalidIntParameter(key, value.to_string())),
348            Ok(n) => Ok(n),
349        }
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356
357    use std::net::SocketAddr;
358
359    #[test]
360    fn parse_listen() {
361        // Listener mode: if you leave the host part empty (adapter may be specified)
362        assert_eq!(
363            "srt://:1234".parse(),
364            Ok(SrtUri(ListenerOptions::new(1234).unwrap().into()))
365        );
366        assert_eq!(
367            "srt://:1234?adapter=127.0.0.1".parse(),
368            Ok(SrtUri(
369                ListenerOptions::new("127.0.0.1:1234").unwrap().into()
370            ))
371        );
372        assert_eq!(
373            "srt://10.10.10.100:5001?mode=listener".parse(),
374            Ok(SrtUri(
375                ListenerOptions::new("10.10.10.100:5001").unwrap().into()
376            ))
377        );
378    }
379
380    #[test]
381    fn parse_call() {
382        // Caller mode: if you specify host part, but not adapter parameter
383        assert_eq!(
384            "srt://10.1.0.1:1234".parse(),
385            Ok(SrtUri(
386                CallerOptions::new("10.1.0.1:1234", None).unwrap().into()
387            ))
388        );
389        assert_eq!(
390            "srt://10.1.0.1:5001?adapter=127.0.0.1&port=4001&mode=caller".parse(),
391            Ok(SrtUri(
392                CallerOptions::new("10.1.0.1:5001", None)
393                    .unwrap()
394                    .set(|o| o.socket.connect.local =
395                        SocketAddr::from_str("127.0.0.1:4001").unwrap())
396                    .unwrap()
397                    .into()
398            ))
399        );
400    }
401
402    #[test]
403    fn parse_rendezvous() {
404        // Rendezvous mode: if you specify host AND adapter parameter
405        assert_eq!(
406            "srt://10.1.0.1:1234?adapter=127.0.0.1".parse(),
407            Ok(SrtUri(
408                RendezvousOptions::new("10.1.0.1:1234")
409                    .unwrap()
410                    .set(|o| o.socket.connect.local = "127.0.0.1:1234".parse().unwrap())
411                    .unwrap()
412                    .into()
413            ))
414        );
415        assert_eq!(
416            "srt://10.1.0.1:5001?mode=rendezvous".parse(),
417            Ok(SrtUri(
418                RendezvousOptions::new("10.1.0.1:5001")
419                    .unwrap()
420                    .set(|o| o.socket.connect.local = "0.0.0.0:5001".parse().unwrap())
421                    .unwrap()
422                    .into()
423            ))
424        );
425        assert_eq!(
426            "srt://10.1.0.1:5001?port=4001&adapter=127.0.0.1".parse(),
427            Ok(SrtUri(
428                RendezvousOptions::new("10.1.0.1:5001")
429                    .unwrap()
430                    .set(|o| o.socket.connect.local = "127.0.0.1:4001".parse().unwrap())
431                    .unwrap()
432                    .into()
433            ))
434        );
435    }
436
437    #[test]
438    fn parse_parameters() {
439        let mut socket = SocketOptions::default();
440        socket.connect.timeout = Duration::from_millis(10_000);
441        socket.sender.flow_control_window_size = PacketCount(50_000);
442        socket.connect.ip_ttl = 32;
443        socket.encryption.km_refresh.period = PacketCount(33000);
444        socket.encryption.km_refresh.pre_announcement_period = PacketCount(11000);
445        socket.sender.peer_latency = Duration::from_millis(42);
446        socket.receiver.latency = Duration::from_millis(42);
447        socket.connect.linger = Some(Duration::from_millis(128));
448        socket.receiver.reorder_tolerance_max = PacketCount(256);
449        socket.session.max_segment_size = PacketSize(1300);
450        socket.encryption.passphrase = Some("passphrase1234".into());
451        socket.sender.max_payload_size = PacketSize(1234);
452        socket.encryption.key_size = KeySize::AES256;
453        socket.session.peer_idle_timeout = Duration::from_millis(4242);
454        socket.receiver.buffer_size = ByteCount(22_000_000);
455        socket.sender.buffer_size = ByteCount(23_000_000);
456        socket.sender.drop_delay = Duration::from_millis(84);
457
458        assert_eq!(
459            SrtUri::from_str("srt://10.1.1.1:1234?conntimeo=10000&fc=50000&ipttl=32&kmpreannounce=33000&kmrefreshrate=11000&latency=42&linger=128&lossmaxttl=256&mss=1300&passphrase=passphrase1234&payloadsize=1234&pbkeylen=32&peeridletimeo=4242&rcvbuf=22000000&sndbuf=23000000&snddropdelay=84&streamid=TheStreamID"),
460            Ok(SrtUri(CallerOptions::with("10.1.1.1:1234", Some("TheStreamID"), socket).unwrap().into()))
461        );
462    }
463
464    #[test]
465    fn parse_bandwidth() {
466        let mut socket = SocketOptions::default();
467        socket.sender.bandwidth = LiveBandwidthMode::Max(DataRate(10_000_000));
468
469        assert_eq!(
470            "srt://:1234?maxbw=10000000".parse(),
471            Ok(SrtUri(ListenerOptions::with(1234, socket).unwrap().into()))
472        );
473
474        let mut socket = SocketOptions::default();
475        socket.sender.bandwidth = LiveBandwidthMode::Input {
476            rate: DataRate(20_000_000),
477            overhead: Percent(5),
478        };
479        assert_eq!(
480            "srt://:1234?inputbw=20000000".parse(),
481            Ok(SrtUri(ListenerOptions::with(1234, socket).unwrap().into()))
482        );
483
484        let mut socket = SocketOptions::default();
485        socket.sender.bandwidth = LiveBandwidthMode::Input {
486            rate: DataRate(20_000_000),
487            overhead: Percent(10),
488        };
489        assert_eq!(
490            "srt://:1234?inputbw=20000000&oheadbw=10".parse(),
491            Ok(SrtUri(ListenerOptions::with(1234, socket).unwrap().into()))
492        );
493
494        let mut socket = SocketOptions::default();
495        socket.sender.bandwidth = LiveBandwidthMode::Estimated {
496            expected: DataRate(30_000_000),
497            overhead: Percent(5),
498        };
499        assert_eq!(
500            "srt://:1234?mininputbw=30000000".parse(),
501            Ok(SrtUri(ListenerOptions::with(1234, socket).unwrap().into()))
502        );
503
504        let mut socket = SocketOptions::default();
505        socket.sender.bandwidth = LiveBandwidthMode::Estimated {
506            expected: DataRate(30_000_000),
507            overhead: Percent(40),
508        };
509        assert_eq!(
510            "srt://:1234?mininputbw=30000000&oheadbw=40".parse(),
511            Ok(SrtUri(ListenerOptions::with(1234, socket).unwrap().into()))
512        );
513    }
514}