1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#[cfg(test)]
mod server_test;
pub mod config;
pub mod request;
use crate::allocation::allocation_manager::*;
use crate::auth::AuthHandler;
use crate::proto::lifetime::DEFAULT_LIFETIME;
use config::*;
use request::*;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{Duration, Instant};
use util::{Conn, Error};
const INBOUND_MTU: usize = 1500;
pub struct Server {
auth_handler: Arc<Box<dyn AuthHandler + Send + Sync>>,
realm: String,
channel_bind_timeout: Duration,
pub(crate) nonces: Arc<Mutex<HashMap<String, Instant>>>,
}
impl Server {
pub async fn new(config: ServerConfig) -> Result<Self, Error> {
config.validate()?;
let mut s = Server {
auth_handler: config.auth_handler,
realm: config.realm,
channel_bind_timeout: config.channel_bind_timeout,
nonces: Arc::new(Mutex::new(HashMap::new())),
};
if s.channel_bind_timeout == Duration::from_secs(0) {
s.channel_bind_timeout = DEFAULT_LIFETIME;
}
for p in config.conn_configs.into_iter() {
let nonces = Arc::clone(&s.nonces);
let auth_handler = Arc::clone(&s.auth_handler);
let realm = s.realm.clone();
let channel_bind_timeout = s.channel_bind_timeout;
tokio::spawn(async move {
let allocation_manager = Arc::new(Manager::new(ManagerConfig {
relay_addr_generator: p.relay_addr_generator,
}));
let _ = Server::read_loop(
p.conn,
allocation_manager,
nonces,
auth_handler,
realm,
channel_bind_timeout,
)
.await;
});
}
Ok(s)
}
async fn read_loop(
conn: Arc<dyn Conn + Send + Sync>,
allocation_manager: Arc<Manager>,
nonces: Arc<Mutex<HashMap<String, Instant>>>,
auth_handler: Arc<Box<dyn AuthHandler + Send + Sync>>,
realm: String,
channel_bind_timeout: Duration,
) {
let mut buf = vec![0u8; INBOUND_MTU];
loop {
let (n, addr) = match conn.recv_from(&mut buf).await {
Ok((n, addr)) => (n, addr),
Err(err) => {
log::debug!("exit read loop on error: {}", err);
break;
}
};
let mut r = Request {
conn: Arc::clone(&conn),
src_addr: addr,
buff: buf[..n].to_vec(),
allocation_manager: Arc::clone(&allocation_manager),
nonces: Arc::clone(&nonces),
auth_handler: Arc::clone(&auth_handler),
realm: realm.clone(),
channel_bind_timeout,
};
if let Err(err) = r.handle_request().await {
log::error!("error when handling datagram: {}", err);
}
}
let _ = allocation_manager.close().await;
}
pub fn close(&self) -> Result<(), Error> {
Ok(())
}
}