tx5_go_pion_turn/
lib.rs

1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3#![doc = tx5_core::__doc_header!()]
4//! # tx5-go-pion-turn
5//!
6//! Rust process wrapper around tx5-go-pion-turn executable.
7
8/// Re-exported dependencies.
9pub mod deps {
10    pub use tx5_core;
11    pub use tx5_core::deps::*;
12}
13
14const EXE_BYTES: &[u8] =
15    include_bytes!(concat!(env!("OUT_DIR"), "/tx5-go-pion-turn"));
16
17include!(concat!(env!("OUT_DIR"), "/exe_hash.rs"));
18
19pub use tx5_core::{Error, ErrorExt, Result};
20
21use once_cell::sync::Lazy;
22
23// keep the file handle open to mitigate replacements on some oses
24static EXE: Lazy<tx5_core::file_check::FileCheck> = Lazy::new(|| {
25    #[cfg(windows)]
26    let ext = ".exe";
27    #[cfg(not(windows))]
28    let ext = "";
29
30    match tx5_core::file_check::file_check(
31        EXE_BYTES,
32        EXE_HASH,
33        "tx5-go-pion-turn",
34        ext,
35    ) {
36        Err(err) => panic!("failed to write turn exe: {err:?}"),
37        Ok(exe) => exe,
38    }
39});
40
41/// Rust process wrapper around tx5-go-pion-turn executable.
42pub struct Tx5TurnServer {
43    proc: tokio::process::Child,
44}
45
46impl Tx5TurnServer {
47    /// Start up a new TURN server.
48    pub async fn new(
49        public_ip: std::net::IpAddr,
50        bind_port: u16,
51        users: Vec<(String, String)>,
52        realm: String,
53    ) -> Result<(String, Self)> {
54        let mut cmd = tokio::process::Command::new(EXE.path());
55        cmd.stdin(std::process::Stdio::piped());
56        cmd.stdout(std::process::Stdio::piped());
57        cmd.stderr(std::process::Stdio::piped());
58        cmd.kill_on_drop(true);
59        cmd.arg("-public-ip");
60        cmd.arg(public_ip.to_string());
61        cmd.arg("-port");
62        cmd.arg(format!("{bind_port}"));
63        cmd.arg("-realm");
64        cmd.arg(realm);
65        cmd.arg("-users");
66        cmd.arg(
67            users
68                .into_iter()
69                .map(|(u, p)| format!("{u}={p}"))
70                .collect::<Vec<_>>()
71                .join(","),
72        );
73
74        tracing::info!("ABOUT TO SPAWN: {cmd:?}");
75
76        let mut proc = cmd.spawn()?;
77        drop(proc.stdin.take());
78        let mut stderr = proc.stderr.take().unwrap();
79
80        let mut output = Vec::new();
81        let mut buf = [0; 4096];
82        loop {
83            use tokio::io::AsyncReadExt;
84
85            let len = stderr.read(&mut buf).await?;
86            if len == 0 {
87                return Err(Error::id("BrokenPipe"));
88            }
89            output.extend_from_slice(&buf[0..len]);
90
91            let s = String::from_utf8_lossy(&output);
92            let mut i = s.split("#ICE#(");
93            if i.next().is_some() {
94                if let Some(s) = i.next() {
95                    if s.contains(")#") {
96                        let mut i = s.split(")#");
97                        if let Some(s) = i.next() {
98                            return Ok((s.to_string(), Self { proc }));
99                        }
100                    }
101                }
102            }
103        }
104    }
105
106    /// Stop and clean up the TURN server sub-process.
107    /// Note, a drop will attempt to clean up the process, but to be sure,
108    /// use this function.
109    pub async fn stop(mut self) -> Result<()> {
110        // note, if the process already ended, kill may return an error.
111        let _ = self.proc.kill().await;
112        self.proc.wait().await?;
113        Ok(())
114    }
115}
116
117/// Construct an ephemeral turn server on an available port designed for
118/// unit testing.
119pub async fn test_turn_server() -> Result<(String, Tx5TurnServer)> {
120    let mut addr = None;
121
122    for iface in if_addrs::get_if_addrs()? {
123        if iface.is_loopback() {
124            continue;
125        }
126        if iface.ip().is_ipv6() {
127            continue;
128        }
129        addr = Some(iface.ip());
130        break;
131    }
132
133    if addr.is_none() {
134        return Err(Error::id("NoLocalIp"));
135    }
136
137    let (turn, srv) = Tx5TurnServer::new(
138        addr.unwrap(),
139        0,
140        vec![("test".into(), "test".into())],
141        "holo.host".into(),
142    )
143    .await?;
144
145    let turn = format!("{{\"urls\":[\"{turn}\"],\"username\":\"test\",\"credential\":\"test\"}}");
146
147    Ok((turn, srv))
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153
154    fn init_tracing() {
155        let subscriber = tracing_subscriber::FmtSubscriber::builder()
156            .with_env_filter(
157                tracing_subscriber::filter::EnvFilter::from_default_env(),
158            )
159            .with_file(true)
160            .with_line_number(true)
161            .finish();
162        let _ = tracing::subscriber::set_global_default(subscriber);
163    }
164
165    #[tokio::test(flavor = "multi_thread")]
166    async fn sanity() {
167        init_tracing();
168
169        let (ice, srv) = test_turn_server().await.unwrap();
170
171        println!("{}", ice);
172
173        tokio::time::sleep(std::time::Duration::from_millis(300)).await;
174
175        srv.stop().await.unwrap();
176    }
177}