webrtc_turn/server/
mod.rs

1#[cfg(test)]
2mod server_test;
3
4pub mod config;
5pub mod request;
6
7use crate::allocation::allocation_manager::*;
8use crate::auth::AuthHandler;
9use crate::proto::lifetime::DEFAULT_LIFETIME;
10use config::*;
11use request::*;
12
13use std::collections::HashMap;
14use std::sync::Arc;
15use tokio::sync::Mutex;
16use tokio::time::{Duration, Instant};
17
18use util::{Conn, Error};
19
20const INBOUND_MTU: usize = 1500;
21
22// Server is an instance of the Pion TURN Server
23pub struct Server {
24    auth_handler: Arc<Box<dyn AuthHandler + Send + Sync>>,
25    realm: String,
26    channel_bind_timeout: Duration,
27    pub(crate) nonces: Arc<Mutex<HashMap<String, Instant>>>,
28}
29
30impl Server {
31    // creates the TURN server
32    pub async fn new(config: ServerConfig) -> Result<Self, Error> {
33        config.validate()?;
34
35        let mut s = Server {
36            auth_handler: config.auth_handler,
37            realm: config.realm,
38            channel_bind_timeout: config.channel_bind_timeout,
39            nonces: Arc::new(Mutex::new(HashMap::new())),
40        };
41
42        if s.channel_bind_timeout == Duration::from_secs(0) {
43            s.channel_bind_timeout = DEFAULT_LIFETIME;
44        }
45
46        for p in config.conn_configs.into_iter() {
47            let nonces = Arc::clone(&s.nonces);
48            let auth_handler = Arc::clone(&s.auth_handler);
49            let realm = s.realm.clone();
50            let channel_bind_timeout = s.channel_bind_timeout;
51
52            tokio::spawn(async move {
53                let allocation_manager = Arc::new(Manager::new(ManagerConfig {
54                    relay_addr_generator: p.relay_addr_generator,
55                }));
56
57                let _ = Server::read_loop(
58                    p.conn,
59                    allocation_manager,
60                    nonces,
61                    auth_handler,
62                    realm,
63                    channel_bind_timeout,
64                )
65                .await;
66            });
67        }
68
69        Ok(s)
70    }
71
72    async fn read_loop(
73        conn: Arc<dyn Conn + Send + Sync>,
74        allocation_manager: Arc<Manager>,
75        nonces: Arc<Mutex<HashMap<String, Instant>>>,
76        auth_handler: Arc<Box<dyn AuthHandler + Send + Sync>>,
77        realm: String,
78        channel_bind_timeout: Duration,
79    ) {
80        let mut buf = vec![0u8; INBOUND_MTU];
81
82        loop {
83            //TODO: gracefully exit loop
84            let (n, addr) = match conn.recv_from(&mut buf).await {
85                Ok((n, addr)) => (n, addr),
86                Err(err) => {
87                    log::debug!("exit read loop on error: {}", err);
88                    break;
89                }
90            };
91
92            let mut r = Request {
93                conn: Arc::clone(&conn),
94                src_addr: addr,
95                buff: buf[..n].to_vec(),
96                allocation_manager: Arc::clone(&allocation_manager),
97                nonces: Arc::clone(&nonces),
98                auth_handler: Arc::clone(&auth_handler),
99                realm: realm.clone(),
100                channel_bind_timeout,
101            };
102
103            if let Err(err) = r.handle_request().await {
104                log::error!("error when handling datagram: {}", err);
105            }
106        }
107
108        let _ = allocation_manager.close().await;
109    }
110
111    // Close stops the TURN Server. It cleans up any associated state and closes all connections it is managing
112    pub fn close(&self) -> Result<(), Error> {
113        Ok(())
114    }
115}