1use std::{
2 borrow::Cow,
3 convert::{TryFrom, TryInto},
4 net::{AddrParseError, IpAddr},
5 str::FromStr,
6 time::Duration,
7};
8
9use regex::Regex;
10use thiserror::Error;
11use url::{Host, ParseError, Url};
12
13use crate::options::*;
14
15#[derive(Debug, Clone, Eq, PartialEq)]
17pub struct SrtUri(BindOptions);
18
19#[derive(Error, Debug, Clone, Eq, PartialEq)]
20pub enum SrtUriError {
21 #[error("{0}")]
22 InvalidOptions(#[from] OptionsError),
23 #[error("{0}")]
24 InvalidUrl(#[from] ParseError),
25 #[error("Invalid adapter: {0}")]
26 InvalidAdapter(#[from] AddrParseError),
27 #[error("Invalid mode: {0}")]
28 InvalidMode(String),
29 #[error("Invalid parameter: {0}={1}, expected positive integer")]
30 InvalidIntParameter(&'static str, String),
31 #[error("Unimplemented parameter: {0}")]
32 UnimplementedParameter(&'static str),
33}
34
35pub fn url_parse(s: &str, mode_listener: bool) -> Result<Url, ParseError> {
36 let re = Regex::new(r"([a-z]{3})://:([0-9]*)\??(.*)").unwrap();
37 if re.is_match(s) {
38 let caps = re.captures(s).unwrap();
39 let protocol = caps.get(1).map_or("", |m| m.as_str());
40 let port = caps.get(2).map_or("", |m| m.as_str());
41 let options = caps.get(3).map_or("", |m| m.as_str());
42 let listener = if mode_listener { "mode=listener&" } else { "" };
43 Url::parse(&format!(
44 "{}://0.0.0.0:{}?{}{}",
45 protocol, port, listener, options
46 ))
47 } else {
48 Url::parse(s)
49 }
50}
51
52impl TryFrom<Url> for SrtUri {
53 type Error = SrtUriError;
54
55 fn try_from(url: Url) -> Result<Self, Self::Error> {
56 let (mode, adapter, stream_id, mut socket) = Self::parse_query_pairs(&url)?;
57 let host = Self::parse_host(&url);
58 let port = url.port();
59
60 use SrtUrlMode::*;
61 match (mode, host, port, adapter) {
62 (Unspecified, None, Some(port), None) => {
63 Ok(SrtUri(ListenerOptions::with(port, socket)?.into()))
64 }
65 (Unspecified, None, Some(port), Some(host)) | (Listener, Some(host), Some(port), None)
67 | (Listener, Some(_), Some(port), Some(host)) => Ok(SrtUri(
68 ListenerOptions::with(SocketAddress { host, port }, socket)?.into(),
69 )),
70 (Unspecified, Some(host), Some(port), adapter @ None)
71 | (Caller, Some(host), Some(port), adapter) => {
72 if let Some(adapter) = adapter {
73 let ip = adapter.try_into().unwrap();
74 socket.connect.local.set_ip(ip);
75 }
76 let remote = SocketAddress { host, port };
77 let stream_id = stream_id.as_ref().map(|s| s.as_ref());
78 Ok(SrtUri(
79 CallerOptions::with(remote, stream_id, socket)?.into(),
80 ))
81 }
82 (Unspecified, Some(host), Some(port), adapter @ Some(_))
83 | (Rendezvous, Some(host), Some(port), adapter) => {
84 let remote = SocketAddress { host, port };
85 if let Some(adapter) = adapter {
86 let ip = adapter.try_into().unwrap();
87 socket.connect.local.set_ip(ip);
88 }
89
90 if socket.connect.local.port() == 0 {
91 socket.connect.local.set_port(port);
92 }
93 Ok(SrtUri(RendezvousOptions::with(remote, socket)?.into()))
94 }
95 (Caller, Some(_), None, _)
96 | (Rendezvous, Some(_), None, _)
97 | (Unspecified, None, None, _)
98 | (Unspecified, Some(_), None, _)
99 | (Listener, None, _, _)
100 | (Listener, Some(_), None, _)
101 | (Caller, None, _, _)
102 | (Rendezvous, None, _, _) => {
103 unimplemented!();
104 }
105 }
106 }
107}
108
109impl FromStr for SrtUri {
110 type Err = SrtUriError;
111
112 fn from_str(s: &str) -> Result<Self, Self::Err> {
113 url_parse(s, true)?.try_into()
114 }
115}
116
117enum SrtUrlMode {
118 Unspecified,
119 Listener,
120 Caller,
121 Rendezvous,
122}
123
124type QueryPairs<'a> = (
125 SrtUrlMode,
126 Option<SocketHost>,
127 Option<Cow<'a, str>>,
128 SocketOptions,
129);
130
131impl SrtUri {
132 fn parse_query_pairs(url: &Url) -> Result<QueryPairs, SrtUriError> {
133 use SrtUriError::*;
134
135 let mut mode = SrtUrlMode::Unspecified;
136 let mut adapter: Option<SocketHost> = None;
137 let mut stream_id = None;
138 let mut socket = SocketOptions::default();
139
140 let mut inputbw = None;
141 let mut maxbw = None;
142 let mut mininputbw = None;
143 let mut oheadbw = None;
144
145 for (key, value) in url.query_pairs() {
146 match key.as_ref() {
147 "mode" => {
148 mode = match value.as_ref() {
149 "listener" => SrtUrlMode::Listener,
150 "caller" => SrtUrlMode::Caller,
151 "rendezvous" => SrtUrlMode::Rendezvous,
152 value => return Err(InvalidMode(value.to_string())),
153 };
154 }
155 "adapter" => {
156 adapter = Some(
157 IpAddr::from_str(value.as_ref())
158 .map_err(InvalidAdapter)?
159 .into(),
160 );
161 }
162 "port" => {
163 let value = u16::try_from(Self::parse_int_param("port", value)?)
164 .map_err(|e| SrtUriError::InvalidIntParameter("port", e.to_string()))?;
165 socket.connect.local.set_port(value);
166 }
167 "conntimeo" => {
168 let value = Self::parse_int_param("conntimeo", value)?;
169 socket.connect.timeout = Duration::from_millis(value);
170 }
171 "drifttracer" => unimplemented!(),
172 "enforcedencryption" => unimplemented!(),
173 "fc" => {
174 let value = Self::parse_int_param("fc", value)?;
175 socket.sender.flow_control_window_size = PacketCount(value);
176 }
177 "groupconnect" => return Err(UnimplementedParameter("groupconnect")),
178 "groupstabtimeo" => return Err(UnimplementedParameter("groupstabtimeo")),
179 "inputbw" => {
180 let value = Self::parse_int_param("inputbw", value)?;
181 if value > 0 {
182 inputbw = Some(DataRate(value));
183 }
184 }
185 "iptos" => unimplemented!(),
186 "ipttl" => {
187 let value = Self::parse_int_param("ipttl", value)?;
188 if value > 255 {
189 return Err(SrtUriError::InvalidIntParameter("ipttl", value.to_string()));
190 }
191 socket.connect.ip_ttl = value as u8;
192 }
193 "ipv6only" => unimplemented!(),
194 "kmpreannounce" => {
195 let value = Self::parse_int_param("kmpreannounce", value)?;
196 socket.encryption.km_refresh.period = PacketCount(value);
197 }
198 "kmrefreshrate" => {
199 let value = Self::parse_int_param("kmrefreshrate", value)?;
200 socket.encryption.km_refresh.pre_announcement_period = PacketCount(value);
201 }
202 "latency" => {
203 let value = Self::parse_int_param("latency", value)?;
204 let latency = Duration::from_millis(value);
205 socket.sender.peer_latency = latency;
206 socket.receiver.latency = latency;
207 }
208 "linger" => {
209 let value = Self::parse_int_param("linger", value)?;
210 socket.connect.linger = Some(Duration::from_millis(value));
211 }
212 "lossmaxttl" => {
213 let value = Self::parse_int_param("lossmaxttl", value)?;
214 socket.receiver.reorder_tolerance_max = PacketCount(value);
215 }
216 "maxbw" => {
217 let value = Self::parse_int_param("maxbw", value)?;
218 if value > 0 {
219 maxbw = Some(DataRate(value));
220 }
221 }
222 "mininputbw" => {
223 let value = Self::parse_int_param("mininputbw", value)?;
224 if value > 0 {
225 mininputbw = Some(DataRate(value));
226 }
227 }
228 "messageapi" => return Err(UnimplementedParameter("messageapi")),
229 "minversion" => {
230 let digits: Result<Vec<_>, _> =
231 value.as_ref().split('.').map(u8::from_str).collect();
232 match digits {
233 Err(_) => {
234 return Err(SrtUriError::InvalidIntParameter(
235 "minversion",
236 value.to_string(),
237 ))
238 }
239 Ok(digits) if digits.len() != 3 => {
240 return Err(SrtUriError::InvalidIntParameter(
241 "minversion",
242 value.to_string(),
243 ))
244 }
245 Ok(digits) => {
246 socket.connect.min_version =
247 SrtVersion::new(digits[0], digits[1], digits[2])
248 }
249 }
250 }
251 "mss" => {
252 let value = Self::parse_int_param("mss", value)?;
253 socket.session.max_segment_size = PacketSize(value);
254 }
255 "nakreport" => unimplemented!(),
256 "oheadbw" => {
257 let value = Self::parse_int_param("oheadbw", value)?;
258 if value > 5 {
259 oheadbw = Some(Percent(value));
260 }
261 }
262 "packetfilter" => return Err(UnimplementedParameter("packetfilter")),
263 "passphrase" => {
264 socket.encryption.passphrase = Some(value.to_string().try_into()?);
265 }
266 "payloadsize" => {
267 let value = Self::parse_int_param("payloadsize", value)?;
268 socket.sender.max_payload_size = PacketSize(value);
269 }
270 "pbkeylen" => {
271 let value = u16::try_from(Self::parse_int_param("pbkeylen", value)?)
272 .map_err(|e| InvalidIntParameter("pbkeylen", e.to_string()))?;
273 socket.encryption.key_size = value.try_into()?;
274 }
275 "peeridletimeo" => {
276 let value = Self::parse_int_param("peeridletimeo", value)?;
277 socket.session.peer_idle_timeout = Duration::from_millis(value);
278 }
279 "peerlatency" => {
280 let value = Self::parse_int_param("peerlatency", value)?;
281 socket.sender.peer_latency = Duration::from_millis(value);
282 }
283 "rcvbuf" => {
284 let value = Self::parse_int_param("rcvbuf", value)?;
285 socket.receiver.buffer_size = ByteCount(value);
286 }
287 "rcvlatency" => {
288 let value = Self::parse_int_param("rcvlatency", value)?;
289 socket.receiver.latency = Duration::from_millis(value);
290 }
291 "retransmitalgo" => unimplemented!(),
292 "sndbuf" => {
293 let value = Self::parse_int_param("sndbuf", value)?;
294 socket.sender.buffer_size = ByteCount(value);
295 }
296 "snddropdelay" => {
297 let value = Self::parse_int_param("snddropdelay", value)?;
298 socket.sender.drop_delay = Duration::from_millis(value);
299 }
300 "streamid" => {
301 stream_id = Some(value);
302 }
303 "tlpktdrop" => unimplemented!(),
304 "transtype" => return Err(UnimplementedParameter("transtype")),
305 "tsbpdmode" => return Err(UnimplementedParameter("tsbpdmode")),
306 _ => {}
307 }
308 }
309
310 socket.sender.bandwidth = match (maxbw, inputbw, oheadbw, mininputbw) {
311 (Some(rate), None, _, _) => LiveBandwidthMode::Max(rate),
312 (None, Some(rate), overhead, _) => LiveBandwidthMode::Input {
313 rate,
314 overhead: overhead.unwrap_or(Percent(5)),
315 },
316 (None, None, overhead, Some(expected)) => LiveBandwidthMode::Estimated {
317 expected,
318 overhead: overhead.unwrap_or(Percent(5)),
319 },
320 _ => LiveBandwidthMode::Unlimited,
321 };
322
323 Ok((mode, adapter, stream_id, socket))
324 }
325
326 fn parse_host(url: &Url) -> Option<SocketHost> {
327 let host = match url.host() {
328 Some(Host::Domain(domain)) => Some(match IpAddr::from_str(domain) {
329 Ok(IpAddr::V4(ip)) => Host::Ipv4(ip),
330 Ok(IpAddr::V6(ip)) => Host::Ipv6(ip),
331 Err(_) => Host::Domain(domain),
332 }),
333 host => host,
334 };
335
336 let host: Option<SocketHost> = match host {
337 Some(Host::Domain("")) | None => None,
338 Some(Host::Ipv4(ip)) => Some(ip.into()),
339 Some(Host::Ipv6(ip)) => Some(ip.into()),
340 Some(Host::Domain(domain)) => Some(SocketHost::Domain(domain.to_string())),
341 };
342 host
343 }
344
345 fn parse_int_param(key: &'static str, value: Cow<str>) -> Result<u64, SrtUriError> {
346 match value.as_ref().parse() {
347 Ok(0) | Err(_) => Err(SrtUriError::InvalidIntParameter(key, value.to_string())),
348 Ok(n) => Ok(n),
349 }
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356
357 use std::net::SocketAddr;
358
359 #[test]
360 fn parse_listen() {
361 assert_eq!(
363 "srt://:1234".parse(),
364 Ok(SrtUri(ListenerOptions::new(1234).unwrap().into()))
365 );
366 assert_eq!(
367 "srt://:1234?adapter=127.0.0.1".parse(),
368 Ok(SrtUri(
369 ListenerOptions::new("127.0.0.1:1234").unwrap().into()
370 ))
371 );
372 assert_eq!(
373 "srt://10.10.10.100:5001?mode=listener".parse(),
374 Ok(SrtUri(
375 ListenerOptions::new("10.10.10.100:5001").unwrap().into()
376 ))
377 );
378 }
379
380 #[test]
381 fn parse_call() {
382 assert_eq!(
384 "srt://10.1.0.1:1234".parse(),
385 Ok(SrtUri(
386 CallerOptions::new("10.1.0.1:1234", None).unwrap().into()
387 ))
388 );
389 assert_eq!(
390 "srt://10.1.0.1:5001?adapter=127.0.0.1&port=4001&mode=caller".parse(),
391 Ok(SrtUri(
392 CallerOptions::new("10.1.0.1:5001", None)
393 .unwrap()
394 .set(|o| o.socket.connect.local =
395 SocketAddr::from_str("127.0.0.1:4001").unwrap())
396 .unwrap()
397 .into()
398 ))
399 );
400 }
401
402 #[test]
403 fn parse_rendezvous() {
404 assert_eq!(
406 "srt://10.1.0.1:1234?adapter=127.0.0.1".parse(),
407 Ok(SrtUri(
408 RendezvousOptions::new("10.1.0.1:1234")
409 .unwrap()
410 .set(|o| o.socket.connect.local = "127.0.0.1:1234".parse().unwrap())
411 .unwrap()
412 .into()
413 ))
414 );
415 assert_eq!(
416 "srt://10.1.0.1:5001?mode=rendezvous".parse(),
417 Ok(SrtUri(
418 RendezvousOptions::new("10.1.0.1:5001")
419 .unwrap()
420 .set(|o| o.socket.connect.local = "0.0.0.0:5001".parse().unwrap())
421 .unwrap()
422 .into()
423 ))
424 );
425 assert_eq!(
426 "srt://10.1.0.1:5001?port=4001&adapter=127.0.0.1".parse(),
427 Ok(SrtUri(
428 RendezvousOptions::new("10.1.0.1:5001")
429 .unwrap()
430 .set(|o| o.socket.connect.local = "127.0.0.1:4001".parse().unwrap())
431 .unwrap()
432 .into()
433 ))
434 );
435 }
436
437 #[test]
438 fn parse_parameters() {
439 let mut socket = SocketOptions::default();
440 socket.connect.timeout = Duration::from_millis(10_000);
441 socket.sender.flow_control_window_size = PacketCount(50_000);
442 socket.connect.ip_ttl = 32;
443 socket.encryption.km_refresh.period = PacketCount(33000);
444 socket.encryption.km_refresh.pre_announcement_period = PacketCount(11000);
445 socket.sender.peer_latency = Duration::from_millis(42);
446 socket.receiver.latency = Duration::from_millis(42);
447 socket.connect.linger = Some(Duration::from_millis(128));
448 socket.receiver.reorder_tolerance_max = PacketCount(256);
449 socket.session.max_segment_size = PacketSize(1300);
450 socket.encryption.passphrase = Some("passphrase1234".into());
451 socket.sender.max_payload_size = PacketSize(1234);
452 socket.encryption.key_size = KeySize::AES256;
453 socket.session.peer_idle_timeout = Duration::from_millis(4242);
454 socket.receiver.buffer_size = ByteCount(22_000_000);
455 socket.sender.buffer_size = ByteCount(23_000_000);
456 socket.sender.drop_delay = Duration::from_millis(84);
457
458 assert_eq!(
459 SrtUri::from_str("srt://10.1.1.1:1234?conntimeo=10000&fc=50000&ipttl=32&kmpreannounce=33000&kmrefreshrate=11000&latency=42&linger=128&lossmaxttl=256&mss=1300&passphrase=passphrase1234&payloadsize=1234&pbkeylen=32&peeridletimeo=4242&rcvbuf=22000000&sndbuf=23000000&snddropdelay=84&streamid=TheStreamID"),
460 Ok(SrtUri(CallerOptions::with("10.1.1.1:1234", Some("TheStreamID"), socket).unwrap().into()))
461 );
462 }
463
464 #[test]
465 fn parse_bandwidth() {
466 let mut socket = SocketOptions::default();
467 socket.sender.bandwidth = LiveBandwidthMode::Max(DataRate(10_000_000));
468
469 assert_eq!(
470 "srt://:1234?maxbw=10000000".parse(),
471 Ok(SrtUri(ListenerOptions::with(1234, socket).unwrap().into()))
472 );
473
474 let mut socket = SocketOptions::default();
475 socket.sender.bandwidth = LiveBandwidthMode::Input {
476 rate: DataRate(20_000_000),
477 overhead: Percent(5),
478 };
479 assert_eq!(
480 "srt://:1234?inputbw=20000000".parse(),
481 Ok(SrtUri(ListenerOptions::with(1234, socket).unwrap().into()))
482 );
483
484 let mut socket = SocketOptions::default();
485 socket.sender.bandwidth = LiveBandwidthMode::Input {
486 rate: DataRate(20_000_000),
487 overhead: Percent(10),
488 };
489 assert_eq!(
490 "srt://:1234?inputbw=20000000&oheadbw=10".parse(),
491 Ok(SrtUri(ListenerOptions::with(1234, socket).unwrap().into()))
492 );
493
494 let mut socket = SocketOptions::default();
495 socket.sender.bandwidth = LiveBandwidthMode::Estimated {
496 expected: DataRate(30_000_000),
497 overhead: Percent(5),
498 };
499 assert_eq!(
500 "srt://:1234?mininputbw=30000000".parse(),
501 Ok(SrtUri(ListenerOptions::with(1234, socket).unwrap().into()))
502 );
503
504 let mut socket = SocketOptions::default();
505 socket.sender.bandwidth = LiveBandwidthMode::Estimated {
506 expected: DataRate(30_000_000),
507 overhead: Percent(40),
508 };
509 assert_eq!(
510 "srt://:1234?mininputbw=30000000&oheadbw=40".parse(),
511 Ok(SrtUri(ListenerOptions::with(1234, socket).unwrap().into()))
512 );
513 }
514}