1use crate::{
2 app::AppState,
3 call::{
4 ActiveCall, ActiveCallType, Command,
5 active_call::{ActiveCallGuard, CallParams},
6 },
7 handler::playbook,
8 playbook::{Playbook, PlaybookRunner},
9};
10use crate::{event::SessionEvent, media::track::TrackConfig};
11use axum::{
12 Json, Router,
13 extract::{Path, Query, State, WebSocketUpgrade, ws::Message},
14 response::{IntoResponse, Response},
15 routing::get,
16};
17use bytes::Bytes;
18use chrono::Utc;
19use futures::{SinkExt, StreamExt};
20use rustrtc::IceServer;
21use serde_json::json;
22use std::{path::PathBuf, sync::Arc, time::Duration};
23use tokio::{join, select};
24use tokio_util::sync::CancellationToken;
25use tracing::{debug, info, warn};
26use uuid::Uuid;
27
28pub fn call_router() -> Router<AppState> {
29 let r = Router::new()
30 .route("/call", get(ws_handler))
31 .route("/call/webrtc", get(webrtc_handler))
32 .route("/call/sip", get(sip_handler))
33 .route("/list", get(list_active_calls))
34 .route("/kill/{id}", get(kill_active_call));
35 r
36}
37
38pub fn iceservers_router() -> Router<AppState> {
39 let r = Router::new();
40 r.route("/iceservers", get(get_iceservers))
41}
42
43pub fn playbook_router() -> Router<AppState> {
44 Router::new()
45 .route("/api/playbooks", get(playbook::list_playbooks))
46 .route(
47 "/api/playbooks/{name}",
48 get(playbook::get_playbook).post(playbook::save_playbook),
49 )
50 .route(
51 "/api/playbook/run",
52 axum::routing::post(playbook::run_playbook),
53 )
54 .route("/api/records", get(playbook::list_records))
55}
56
57pub async fn ws_handler(
58 ws: WebSocketUpgrade,
59 State(state): State<AppState>,
60 Query(params): Query<CallParams>,
61) -> Response {
62 call_handler(ActiveCallType::WebSocket, ws, state, params).await
63}
64
65pub async fn sip_handler(
66 ws: WebSocketUpgrade,
67 State(state): State<AppState>,
68 Query(params): Query<CallParams>,
69) -> Response {
70 call_handler(ActiveCallType::Sip, ws, state, params).await
71}
72
73pub async fn webrtc_handler(
74 ws: WebSocketUpgrade,
75 State(state): State<AppState>,
76 Query(params): Query<CallParams>,
77) -> Response {
78 call_handler(ActiveCallType::Webrtc, ws, state, params).await
79}
80
81pub async fn call_handler(
82 call_type: ActiveCallType,
83 ws: WebSocketUpgrade,
84 app_state: AppState,
85 params: CallParams,
86) -> Response {
87 let session_id = params
88 .id
89 .unwrap_or_else(|| format!("s.{}", Uuid::new_v4().to_string()));
90 let server_side_track = params.server_side_track.clone();
91 let dump_events = params.dump_events.unwrap_or(true);
92 let ping_interval = params.ping_interval.unwrap_or(20);
93
94 let resp = ws.on_upgrade(move |socket| async move {
95 let (mut ws_sender, mut ws_receiver) = socket.split();
96 let (audio_sender, audio_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
97 let cancel_token = CancellationToken::new();
98 let track_config = TrackConfig::default();
99 let active_call = Arc::new(ActiveCall::new(
100 call_type.clone(),
101 cancel_token.clone(),
102 session_id.clone(),
103 app_state.invitation.clone(),
104 app_state.clone(),
105 track_config,
106 Some(audio_receiver),
107 dump_events,
108 server_side_track,
109 None, ));
111
112 {
114 let mut pending = app_state.pending_playbooks.lock().await;
115 if let Some(name_or_content) = pending.remove(&session_id) {
116 let variables = active_call.call_state.read().await.extras.clone();
117 let playbook_result = if name_or_content.trim().starts_with("---") {
118 Playbook::parse(&name_or_content, variables.as_ref())
119 } else {
120 let path = PathBuf::from("config/playbook").join(&name_or_content);
121 Playbook::load(path, variables.as_ref()).await
122 };
123
124 match playbook_result {
125 Ok(playbook) => match PlaybookRunner::new(playbook, active_call.clone()) {
126 Ok(runner) => {
127 crate::spawn(async move {
128 runner.run().await;
129 });
130 let display_name = if name_or_content.trim().starts_with("---") {
131 "custom content"
132 } else {
133 &name_or_content
134 };
135 info!(session_id, "Playbook runner started for {}", display_name);
136 }
137 Err(e) => {
138 let display_name = if name_or_content.trim().starts_with("---") {
139 "custom content"
140 } else {
141 &name_or_content
142 };
143 warn!(
144 session_id,
145 "Failed to create runner {}: {}", display_name, e
146 )
147 }
148 },
149 Err(e) => {
150 let display_name = if name_or_content.trim().starts_with("---") {
151 "custom content"
152 } else {
153 &name_or_content
154 };
155 warn!(
156 session_id,
157 "Failed to load playbook {}: {}", display_name, e
158 );
159 let event = SessionEvent::Error {
160 timestamp: crate::media::get_timestamp(),
161 track_id: session_id,
162 sender: "playbook".to_string(),
163 error: format!("{}", e),
164 code: None,
165 };
166 let message = match event.into_ws_message() {
167 Ok(msg) => msg,
168 Err(_) => return,
169 };
170 ws_sender.send(message).await.ok();
171 return;
172 }
173 }
174 }
175 }
176
177 let recv_from_ws_loop = async {
178 while let Some(Ok(message)) = ws_receiver.next().await {
179 match message {
180 Message::Text(text) => {
181 let command = match serde_json::from_str::<Command>(&text) {
182 Ok(cmd) => cmd,
183 Err(e) => {
184 warn!(session_id, %text, "Failed to parse command {}",e);
185 continue;
186 }
187 };
188 if let Err(_) = active_call.enqueue_command(command).await {
189 break;
190 }
191 }
192 Message::Binary(bin) => {
193 audio_sender.send(bin.into()).ok();
194 }
195 Message::Close(_) => {
196 info!(session_id, "WebSocket closed by client");
197 break;
198 }
199 _ => {}
200 }
201 }
202 };
203
204 let mut event_receiver = active_call.event_sender.subscribe();
205 let send_to_ws_loop = async {
206 loop {
207 match event_receiver.recv().await {
208 Ok(event) => {
209 let message = match event.into_ws_message() {
210 Ok(msg) => msg,
211 Err(_) => continue,
212 };
213 if let Err(_) = ws_sender.send(message).await {
214 break;
215 }
216 }
217 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
218 Err(_) => break,
219 }
220 }
221 };
222
223 let send_ping_loop = async {
224 if ping_interval == 0 {
225 active_call.cancel_token.cancelled().await;
226 return;
227 }
228 let mut ticker = tokio::time::interval(Duration::from_secs(ping_interval.into()));
229 loop {
230 ticker.tick().await;
231 let payload = Utc::now().to_rfc3339();
232 let event = SessionEvent::Ping {
233 timestamp: crate::media::get_timestamp(),
234 payload: Some(payload),
235 };
236 if let Err(_) = active_call.event_sender.send(event) {
237 break;
238 }
239 }
240 };
241 let guard = ActiveCallGuard::new(active_call.clone());
242 info!(
243 session_id,
244 active_calls = guard.active_calls,
245 ?call_type,
246 "new call started"
247 );
248 let receiver = active_call.new_receiver();
249
250 let (r, _) = join! {
251 active_call.serve(receiver),
252 async {
253 select!{
254 _ = send_ping_loop => {},
255 _ = cancel_token.cancelled() => {},
256 _ = send_to_ws_loop => { },
257 _ = recv_from_ws_loop => {
258 info!(session_id, "WebSocket closed by client");
259 },
260 }
261 cancel_token.cancel();
262 }
263 };
264 match r {
265 Ok(_) => info!(session_id, "call ended successfully"),
266 Err(e) => warn!(session_id, "call ended with error: {}", e),
267 }
268
269 active_call.cleanup().await.ok();
270 while let Ok(event) = event_receiver.try_recv() {
272 let message = match event.into_ws_message() {
273 Ok(msg) => msg,
274 Err(_) => continue,
275 };
276 if let Err(_) = ws_sender.send(message).await {
277 break;
278 }
279 }
280 ws_sender.flush().await.ok();
281 ws_sender.close().await.ok();
282 debug!(session_id, "WebSocket connection closed");
283 });
284 resp
285}
286
287pub(crate) async fn get_iceservers(State(state): State<AppState>) -> Response {
288 if let Some(ice_servers) = state.config.ice_servers.as_ref() {
289 return Json(ice_servers).into_response();
290 }
291 Json(vec![IceServer {
292 urls: vec!["stun:stun.l.google.com:19302".to_string()],
293 ..Default::default()
294 }])
295 .into_response()
296}
297
298pub(crate) async fn list_active_calls(State(state): State<AppState>) -> Response {
299 let calls = state
300 .active_calls
301 .lock()
302 .unwrap()
303 .iter()
304 .map(|(_, c)| {
305 if let Ok(cs) = c.call_state.try_read() {
306 json!({
307 "id": c.session_id,
308 "callType": c.call_type,
309 "cs.option": cs.option,
310 "ringTime": cs.ring_time,
311 "startTime": cs.answer_time,
312 })
313 } else {
314 json!({
315 "id": c.session_id,
316 "callType": c.call_type,
317 "status": "locked",
318 })
319 }
320 })
321 .collect::<Vec<_>>();
322 Json(serde_json::json!({ "active_calls": calls })).into_response()
323}
324
325pub(crate) async fn kill_active_call(
326 Path(id): Path<String>,
327 State(state): State<AppState>,
328) -> Response {
329 let active_calls = state.active_calls.lock().unwrap();
330 if let Some(call) = active_calls.get(&id) {
331 call.cancel_token.cancel();
332 Json(serde_json::json!({ "status": "killed", "id": id })).into_response()
333 } else {
334 Json(serde_json::json!({ "status": "not_found", "id": id })).into_response()
335 }
336}
337
338trait IntoWsMessage {
339 fn into_ws_message(self) -> Result<Message, serde_json::Error>;
340}
341
342impl IntoWsMessage for crate::event::SessionEvent {
343 fn into_ws_message(self) -> Result<Message, serde_json::Error> {
344 match self {
345 SessionEvent::Binary { data, .. } => Ok(Message::Binary(data.into())),
346 SessionEvent::Ping { timestamp, payload } => {
347 let payload = payload.unwrap_or_else(|| timestamp.to_string());
348 Ok(Message::Ping(payload.into()))
349 }
350 event => serde_json::to_string(&event).map(|payload| Message::Text(payload.into())),
351 }
352 }
353}