1use std::{net::SocketAddr, sync::Arc, time::Instant};
2
3use axum::{
4 Json, Router,
5 extract::{Query, State},
6 http::StatusCode,
7 response::{IntoResponse, Sse, sse::KeepAlive},
8 routing::{delete, get},
9};
10
11use serde::Deserialize;
12use serde_json::json;
13use tokio::net::TcpListener;
14
15use crate::{
16 config::Config,
17 observer::Observer,
18 statistics::Statistics,
19 turn::{PortAllocatePools, Service, SessionAddr},
20};
21
22struct ApiState {
23 config: Arc<Config>,
24 service: Service<Observer>,
25 statistics: Statistics,
26 uptime: Instant,
27}
28
29#[derive(Deserialize)]
30struct QueryParams {
31 address: SocketAddr,
32 interface: SocketAddr,
33}
34
35impl Into<SessionAddr> for QueryParams {
36 fn into(self) -> SessionAddr {
37 SessionAddr {
38 address: self.address,
39 interface: self.interface,
40 }
41 }
42}
43
44pub mod events {
45 use std::sync::LazyLock;
46
47 use axum::response::sse::Event;
48 use serde::Serialize;
49 use tokio::sync::broadcast::{Sender, channel};
50 use tokio_stream::wrappers::BroadcastStream;
51
52 static CHANNEL: LazyLock<Sender<Event>> = LazyLock::new(|| channel(10).0);
53
54 pub fn get_event_stream() -> BroadcastStream<Event> {
55 BroadcastStream::new(CHANNEL.subscribe())
56 }
57
58 pub fn send_with_stream<T, F>(event: &str, handle: F)
59 where
60 F: FnOnce() -> T,
61 T: Serialize,
62 {
63 if CHANNEL.receiver_count() > 0 {
64 let _ = CHANNEL.send(Event::default().event(event).json_data(handle()).unwrap());
65 }
66 }
67}
68
69pub async fn start_server(
79 config: Arc<Config>,
80 service: Service<Observer>,
81 statistics: Statistics,
82) -> anyhow::Result<()> {
83 let state = Arc::new(ApiState {
84 config: config.clone(),
85 uptime: Instant::now(),
86 service,
87 statistics,
88 });
89
90 #[allow(unused_mut)]
91 let mut app = Router::new()
92 .route(
93 "/info",
94 get(|State(app_state): State<Arc<ApiState>>| async move {
95 let sessions = app_state.service.get_sessions();
96 Json(json!({
97 "software": crate::SOFTWARE,
98 "uptime": app_state.uptime.elapsed().as_secs(),
99 "interfaces": app_state.config.turn.interfaces,
100 "port_capacity": PortAllocatePools::capacity(),
101 "port_allocated": sessions.allocated(),
102 }))
103 }),
104 )
105 .route(
106 "/session",
107 get(
108 |Query(query): Query<QueryParams>, State(state): State<Arc<ApiState>>| async move {
109 if let Some(session) = state.service.get_sessions().get_session(&query.into()).get_ref() {
110 Json(json!({
111 "username": session.auth.username,
112 "permissions": session.permissions,
113 "channels": session.allocate.channels,
114 "port": session.allocate.port,
115 "expires": session.expires,
116 }))
117 .into_response()
118 } else {
119 StatusCode::NOT_FOUND.into_response()
120 }
121 },
122 ),
123 )
124 .route(
125 "/session/statistics",
126 get(
127 |Query(query): Query<QueryParams>, State(state): State<Arc<ApiState>>| async move {
128 let addr: SessionAddr = query.into();
129 if let Some(counts) = state.statistics.get(&addr) {
130 Json(json!({
131 "received_bytes": counts.received_bytes,
132 "send_bytes": counts.send_bytes,
133 "received_pkts": counts.received_pkts,
134 "send_pkts": counts.send_pkts,
135 "error_pkts": counts.error_pkts,
136 }))
137 .into_response()
138 } else {
139 StatusCode::NOT_FOUND.into_response()
140 }
141 },
142 ),
143 )
144 .route(
145 "/session",
146 delete(
147 |Query(query): Query<QueryParams>, State(state): State<Arc<ApiState>>| async move {
148 if state.service.get_sessions().refresh(&query.into(), 0) {
149 StatusCode::OK
150 } else {
151 StatusCode::EXPECTATION_FAILED
152 }
153 },
154 ),
155 )
156 .route(
157 "/events",
158 get(|| async move { Sse::new(events::get_event_stream()).keep_alive(KeepAlive::default()) }),
159 );
160
161 #[cfg(feature = "prometheus")]
162 {
163 use crate::statistics::prometheus::generate_metrics;
164 use axum::http::header::CONTENT_TYPE;
165
166 let mut metrics_bytes = Vec::with_capacity(4096);
167
168 app = app.route(
169 "/metrics",
170 get(|| async move {
171 metrics_bytes.clear();
172
173 if generate_metrics(&mut metrics_bytes).is_err() {
174 StatusCode::EXPECTATION_FAILED.into_response()
175 } else {
176 ([(CONTENT_TYPE, "text/plain")], metrics_bytes).into_response()
177 }
178 }),
179 );
180 }
181
182 let listener = TcpListener::bind(config.api.bind).await?;
183
184 log::info!("api server listening={:?}", &config.api.bind);
185
186 axum::serve(listener, app.with_state(state)).await?;
187 Ok(())
188}