turn_server/
api.rs

1use std::{net::SocketAddr, sync::Arc, time::Instant};
2
3use axum::{
4    Json, Router,
5    extract::{Query, State},
6    http::StatusCode,
7    response::{IntoResponse, Sse, sse::KeepAlive},
8    routing::{delete, get},
9};
10
11use serde::Deserialize;
12use serde_json::json;
13use tokio::net::TcpListener;
14
15use crate::{
16    config::Config,
17    observer::Observer,
18    statistics::Statistics,
19    turn::{PortAllocatePools, Service, SessionAddr},
20};
21
22struct ApiState {
23    config: Arc<Config>,
24    service: Service<Observer>,
25    statistics: Statistics,
26    uptime: Instant,
27}
28
29#[derive(Deserialize)]
30struct QueryParams {
31    address: SocketAddr,
32    interface: SocketAddr,
33}
34
35impl Into<SessionAddr> for QueryParams {
36    fn into(self) -> SessionAddr {
37        SessionAddr {
38            address: self.address,
39            interface: self.interface,
40        }
41    }
42}
43
44pub mod events {
45    use std::sync::LazyLock;
46
47    use axum::response::sse::Event;
48    use serde::Serialize;
49    use tokio::sync::broadcast::{Sender, channel};
50    use tokio_stream::wrappers::BroadcastStream;
51
52    static CHANNEL: LazyLock<Sender<Event>> = LazyLock::new(|| channel(10).0);
53
54    pub fn get_event_stream() -> BroadcastStream<Event> {
55        BroadcastStream::new(CHANNEL.subscribe())
56    }
57
58    pub fn send_with_stream<T, F>(event: &str, handle: F)
59    where
60        F: FnOnce() -> T,
61        T: Serialize,
62    {
63        if CHANNEL.receiver_count() > 0 {
64            let _ = CHANNEL.send(Event::default().event(event).json_data(handle()).unwrap());
65        }
66    }
67}
68
69/// start http server
70///
71/// Create an http server and start it, and you can access the controller
72/// instance through the http interface.
73///
74/// Warn: This http server does not contain
75/// any means of authentication, and sensitive information and dangerous
76/// operations can be obtained through this service, please do not expose it
77/// directly to an unsafe environment.
78pub async fn start_server(
79    config: Arc<Config>,
80    service: Service<Observer>,
81    statistics: Statistics,
82) -> anyhow::Result<()> {
83    let state = Arc::new(ApiState {
84        config: config.clone(),
85        uptime: Instant::now(),
86        service,
87        statistics,
88    });
89
90    #[allow(unused_mut)]
91    let mut app = Router::new()
92        .route(
93            "/info",
94            get(|State(app_state): State<Arc<ApiState>>| async move {
95                let sessions = app_state.service.get_sessions();
96                Json(json!({
97                    "software": crate::SOFTWARE,
98                    "uptime": app_state.uptime.elapsed().as_secs(),
99                    "interfaces": app_state.config.turn.interfaces,
100                    "port_capacity": PortAllocatePools::capacity(),
101                    "port_allocated": sessions.allocated(),
102                }))
103            }),
104        )
105        .route(
106            "/session",
107            get(
108                |Query(query): Query<QueryParams>, State(state): State<Arc<ApiState>>| async move {
109                    if let Some(session) = state.service.get_sessions().get_session(&query.into()).get_ref() {
110                        Json(json!({
111                            "username": session.auth.username,
112                            "permissions": session.permissions,
113                            "channels": session.allocate.channels,
114                            "port": session.allocate.port,
115                            "expires": session.expires,
116                        }))
117                        .into_response()
118                    } else {
119                        StatusCode::NOT_FOUND.into_response()
120                    }
121                },
122            ),
123        )
124        .route(
125            "/session/statistics",
126            get(
127                |Query(query): Query<QueryParams>, State(state): State<Arc<ApiState>>| async move {
128                    let addr: SessionAddr = query.into();
129                    if let Some(counts) = state.statistics.get(&addr) {
130                        Json(json!({
131                            "received_bytes": counts.received_bytes,
132                            "send_bytes": counts.send_bytes,
133                            "received_pkts": counts.received_pkts,
134                            "send_pkts": counts.send_pkts,
135                            "error_pkts": counts.error_pkts,
136                        }))
137                        .into_response()
138                    } else {
139                        StatusCode::NOT_FOUND.into_response()
140                    }
141                },
142            ),
143        )
144        .route(
145            "/session",
146            delete(
147                |Query(query): Query<QueryParams>, State(state): State<Arc<ApiState>>| async move {
148                    if state.service.get_sessions().refresh(&query.into(), 0) {
149                        StatusCode::OK
150                    } else {
151                        StatusCode::EXPECTATION_FAILED
152                    }
153                },
154            ),
155        )
156        .route(
157            "/events",
158            get(|| async move { Sse::new(events::get_event_stream()).keep_alive(KeepAlive::default()) }),
159        );
160
161    #[cfg(feature = "prometheus")]
162    {
163        use crate::statistics::prometheus::generate_metrics;
164        use axum::http::header::CONTENT_TYPE;
165
166        let mut metrics_bytes = Vec::with_capacity(4096);
167
168        app = app.route(
169            "/metrics",
170            get(|| async move {
171                metrics_bytes.clear();
172
173                if generate_metrics(&mut metrics_bytes).is_err() {
174                    StatusCode::EXPECTATION_FAILED.into_response()
175                } else {
176                    ([(CONTENT_TYPE, "text/plain")], metrics_bytes).into_response()
177                }
178            }),
179        );
180    }
181
182    let listener = TcpListener::bind(config.api.bind).await?;
183
184    log::info!("api server listening={:?}", &config.api.bind);
185
186    axum::serve(listener, app.with_state(state)).await?;
187    Ok(())
188}