Skip to main content

clia_turn/server/
mod.rs

1#[cfg(test)]
2mod server_test;
3
4pub mod config;
5pub mod request;
6
7use std::collections::HashMap;
8use std::sync::Arc;
9
10use config::*;
11use request::*;
12use tokio::sync::broadcast::error::RecvError;
13use tokio::sync::broadcast::{self};
14use tokio::sync::{mpsc, oneshot, Mutex};
15use tokio::time::{Duration, Instant};
16use util::Conn;
17
18use crate::allocation::allocation_manager::*;
19use crate::allocation::five_tuple::FiveTuple;
20use crate::allocation::AllocationInfo;
21use crate::auth::AuthHandler;
22use crate::error::*;
23use crate::proto::lifetime::DEFAULT_LIFETIME;
24
25const INBOUND_MTU: usize = 1500;
26
27/// Server is an instance of the TURN Server
28pub struct Server {
29    auth_handler: Arc<dyn AuthHandler + Send + Sync>,
30    realm: String,
31    channel_bind_timeout: Duration,
32    pub(crate) nonces: Arc<Mutex<HashMap<String, Instant>>>,
33    command_tx: Mutex<Option<broadcast::Sender<Command>>>,
34}
35
36impl Server {
37    /// creates a new TURN server
38    pub async fn new(config: ServerConfig) -> Result<Self> {
39        config.validate()?;
40
41        let (command_tx, _) = broadcast::channel(16);
42        let mut s = Server {
43            auth_handler: config.auth_handler,
44            realm: config.realm,
45            channel_bind_timeout: config.channel_bind_timeout,
46            nonces: Arc::new(Mutex::new(HashMap::new())),
47            command_tx: Mutex::new(Some(command_tx.clone())),
48        };
49
50        if s.channel_bind_timeout == Duration::from_secs(0) {
51            s.channel_bind_timeout = DEFAULT_LIFETIME;
52        }
53
54        for p in config.conn_configs.into_iter() {
55            let nonces = Arc::clone(&s.nonces);
56            let auth_handler = Arc::clone(&s.auth_handler);
57            let realm = s.realm.clone();
58            let channel_bind_timeout = s.channel_bind_timeout;
59            let handle_rx = command_tx.subscribe();
60            let conn = p.conn;
61            let allocation_manager = Arc::new(Manager::new(ManagerConfig {
62                relay_addr_generator: p.relay_addr_generator,
63                alloc_close_notify: config.alloc_close_notify.clone(),
64            }));
65
66            tokio::spawn(Server::read_loop(
67                conn,
68                allocation_manager,
69                nonces,
70                auth_handler,
71                realm,
72                channel_bind_timeout,
73                handle_rx,
74            ));
75        }
76
77        Ok(s)
78    }
79
80    /// Deletes all existing [`Allocation`][`Allocation`]s by the provided `username`.
81    ///
82    /// [`Allocation`]: crate::allocation::Allocation
83    pub async fn delete_allocations_by_username(&self, username: String) -> Result<()> {
84        let tx = {
85            let command_tx = self.command_tx.lock().await;
86            command_tx.clone()
87        };
88        if let Some(tx) = tx {
89            let (closed_tx, closed_rx) = mpsc::channel(1);
90            tx.send(Command::DeleteAllocations(username, Arc::new(closed_rx)))
91                .map_err(|_| Error::ErrClosed)?;
92
93            closed_tx.closed().await;
94
95            Ok(())
96        } else {
97            Err(Error::ErrClosed)
98        }
99    }
100
101    /// Get information of [`Allocation`][`Allocation`]s by specified [`FiveTuple`]s.
102    ///
103    /// If `five_tuples` is:
104    /// - [`None`]: It returns information about the all
105    ///   [`Allocation`][`Allocation`]s.
106    /// - [`Some`] and not empty: It returns information about
107    ///   the [`Allocation`][`Allocation`]s associated with
108    ///   the specified [`FiveTuples`].
109    /// - [`Some`], but empty: It returns an empty [`HashMap`].
110    ///
111    /// [`Allocation`]: crate::allocation::Allocation
112    pub async fn get_allocations_info(
113        &self,
114        five_tuples: Option<Vec<FiveTuple>>,
115    ) -> Result<HashMap<FiveTuple, AllocationInfo>> {
116        if let Some(five_tuples) = &five_tuples {
117            if five_tuples.is_empty() {
118                return Ok(HashMap::new());
119            }
120        }
121
122        let tx = {
123            let command_tx = self.command_tx.lock().await;
124            command_tx.clone()
125        };
126        if let Some(tx) = tx {
127            let (infos_tx, mut infos_rx) = mpsc::channel(1);
128            tx.send(Command::GetAllocationsInfo(five_tuples, infos_tx))
129                .map_err(|_| Error::ErrClosed)?;
130
131            let mut info: HashMap<FiveTuple, AllocationInfo> = HashMap::new();
132
133            for _ in 0..tx.receiver_count() {
134                info.extend(infos_rx.recv().await.ok_or(Error::ErrClosed)?);
135            }
136
137            Ok(info)
138        } else {
139            Err(Error::ErrClosed)
140        }
141    }
142
143    async fn read_loop(
144        conn: Arc<dyn Conn + Send + Sync>,
145        allocation_manager: Arc<Manager>,
146        nonces: Arc<Mutex<HashMap<String, Instant>>>,
147        auth_handler: Arc<dyn AuthHandler + Send + Sync>,
148        realm: String,
149        channel_bind_timeout: Duration,
150        mut handle_rx: broadcast::Receiver<Command>,
151    ) {
152        let mut buf = vec![0u8; INBOUND_MTU];
153
154        let (mut close_tx, mut close_rx) = oneshot::channel::<()>();
155
156        tokio::spawn({
157            let allocation_manager = Arc::clone(&allocation_manager);
158
159            async move {
160                loop {
161                    match handle_rx.recv().await {
162                        Ok(Command::DeleteAllocations(name, _)) => {
163                            allocation_manager
164                                .delete_allocations_by_username(name.as_str())
165                                .await;
166                            continue;
167                        }
168                        Ok(Command::GetAllocationsInfo(five_tuples, tx)) => {
169                            let infos = allocation_manager.get_allocations_info(five_tuples).await;
170                            let _ = tx.send(infos).await;
171
172                            continue;
173                        }
174                        Err(RecvError::Closed) | Ok(Command::Close(_)) => {
175                            close_rx.close();
176                            break;
177                        }
178                        Err(RecvError::Lagged(n)) => {
179                            log::warn!("Turn server has lagged by {} messages", n);
180                            continue;
181                        }
182                    }
183                }
184            }
185        });
186
187        loop {
188            let (n, addr) = tokio::select! {
189                v = conn.recv_from(&mut buf) => {
190                    match v {
191                        Ok(v) => v,
192                        Err(err) => {
193                            log::debug!("exit read loop on error: {}", err);
194                            break;
195                        }
196                    }
197                },
198                _ = close_tx.closed() => break
199            };
200
201            let mut r = Request {
202                conn: Arc::clone(&conn),
203                src_addr: addr,
204                buff: buf[..n].to_vec(),
205                allocation_manager: Arc::clone(&allocation_manager),
206                nonces: Arc::clone(&nonces),
207                auth_handler: Arc::clone(&auth_handler),
208                realm: realm.clone(),
209                channel_bind_timeout,
210            };
211
212            if let Err(err) = r.handle_request().await {
213                log::error!("error when handling datagram: {}", err);
214            }
215        }
216
217        let _ = allocation_manager.close().await;
218        let _ = conn.close().await;
219    }
220
221    /// Close stops the TURN Server. It cleans up any associated state and closes all connections it is managing.
222    pub async fn close(&self) -> Result<()> {
223        let tx = {
224            let mut command_tx = self.command_tx.lock().await;
225            command_tx.take()
226        };
227
228        if let Some(tx) = tx {
229            if tx.receiver_count() == 0 {
230                return Ok(());
231            }
232
233            let (closed_tx, closed_rx) = mpsc::channel(1);
234            let _ = tx.send(Command::Close(Arc::new(closed_rx)));
235            closed_tx.closed().await
236        }
237
238        Ok(())
239    }
240}
241
242/// The protocol to communicate between the [`Server`]'s public methods
243/// and the tasks spawned in the [`Server::read_loop`] method.
244#[derive(Clone)]
245enum Command {
246    /// Command to delete [`Allocation`][`Allocation`] by provided `username`.
247    ///
248    /// [`Allocation`]: `crate::allocation::Allocation`
249    DeleteAllocations(String, Arc<mpsc::Receiver<()>>),
250
251    /// Command to get information of [`Allocation`][`Allocation`]s by provided [`FiveTuple`]s.
252    ///
253    /// [`Allocation`]: `crate::allocation::Allocation`
254    GetAllocationsInfo(
255        Option<Vec<FiveTuple>>,
256        mpsc::Sender<HashMap<FiveTuple, AllocationInfo>>,
257    ),
258
259    /// Command to close the [`Server`].
260    Close(Arc<mpsc::Receiver<()>>),
261}