1use crate::{
2 app::AppState,
3 call::{
4 ActiveCall, ActiveCallType, Command,
5 active_call::{ActiveCallGuard, CallParams},
6 },
7 handler::playbook,
8 playbook::{Playbook, PlaybookRunner},
9};
10use crate::{event::SessionEvent, media::track::TrackConfig};
11use axum::{
12 Json, Router,
13 extract::{Path, Query, State, WebSocketUpgrade, ws::Message},
14 response::{IntoResponse, Response},
15 routing::get,
16};
17use bytes::Bytes;
18use chrono::Utc;
19use futures::{SinkExt, StreamExt};
20use rustrtc::IceServer;
21use serde_json::json;
22use std::{path::PathBuf, sync::Arc, time::Duration};
23use tokio::{join, select};
24use tokio_util::sync::CancellationToken;
25use tracing::{debug, info, trace, warn};
26use uuid::Uuid;
27
28pub fn call_router() -> Router<AppState> {
29 let r = Router::new()
30 .route("/call", get(ws_handler))
31 .route("/call/webrtc", get(webrtc_handler))
32 .route("/call/sip", get(sip_handler))
33 .route("/list", get(list_active_calls))
34 .route("/kill/{id}", get(kill_active_call));
35 r
36}
37
38pub fn iceservers_router() -> Router<AppState> {
39 let r = Router::new();
40 r.route("/iceservers", get(get_iceservers))
41}
42
43pub fn playbook_router() -> Router<AppState> {
44 Router::new()
45 .route("/api/playbooks", get(playbook::list_playbooks))
46 .route(
47 "/api/playbooks/{name}",
48 get(playbook::get_playbook).post(playbook::save_playbook),
49 )
50 .route(
51 "/api/playbook/run",
52 axum::routing::post(playbook::run_playbook),
53 )
54 .route("/api/records", get(playbook::list_records))
55}
56
57pub async fn ws_handler(
58 ws: WebSocketUpgrade,
59 State(state): State<AppState>,
60 Query(params): Query<CallParams>,
61) -> Response {
62 call_handler(ActiveCallType::WebSocket, ws, state, params).await
63}
64
65pub async fn sip_handler(
66 ws: WebSocketUpgrade,
67 State(state): State<AppState>,
68 Query(params): Query<CallParams>,
69) -> Response {
70 call_handler(ActiveCallType::Sip, ws, state, params).await
71}
72
73pub async fn webrtc_handler(
74 ws: WebSocketUpgrade,
75 State(state): State<AppState>,
76 Query(params): Query<CallParams>,
77) -> Response {
78 call_handler(ActiveCallType::Webrtc, ws, state, params).await
79}
80
81pub async fn call_handler_core(
83 call_type: ActiveCallType,
84 session_id: String,
85 app_state: AppState,
86 cancel_token: CancellationToken,
87 audio_receiver: tokio::sync::mpsc::UnboundedReceiver<Bytes>,
88 server_side_track: Option<String>,
89 dump_events: bool,
90 ping_interval: u64,
91 mut command_receiver: tokio::sync::mpsc::UnboundedReceiver<Command>,
92 event_sender_to_client: tokio::sync::mpsc::UnboundedSender<crate::event::SessionEvent>,
93) {
94 let _cancel_guard = cancel_token.clone().drop_guard();
95 let track_config = TrackConfig::default();
96 let active_call = Arc::new(ActiveCall::new(
97 call_type.clone(),
98 cancel_token.clone(),
99 session_id.clone(),
100 app_state.invitation.clone(),
101 app_state.clone(),
102 track_config,
103 Some(audio_receiver),
104 dump_events,
105 server_side_track,
106 None, ));
108
109 {
111 let mut pending = app_state.pending_playbooks.lock().await;
112 if let Some(name_or_content) = pending.remove(&session_id) {
113 let variables = active_call.call_state.read().await.extras.clone();
114 let playbook_result = if name_or_content.trim().starts_with("---") {
115 Playbook::parse(&name_or_content, variables.as_ref())
116 } else {
117 let path = if name_or_content.starts_with("config/playbook/") {
119 PathBuf::from(&name_or_content)
120 } else {
121 PathBuf::from("config/playbook").join(&name_or_content)
122 };
123 Playbook::load(path, variables.as_ref()).await
124 };
125
126 match playbook_result {
127 Ok(playbook) => match PlaybookRunner::new(playbook, active_call.clone()) {
128 Ok(runner) => {
129 crate::spawn(async move {
130 runner.run().await;
131 });
132 let display_name = if name_or_content.trim().starts_with("---") {
133 "custom content"
134 } else {
135 &name_or_content
136 };
137 info!(session_id, "Playbook runner started for {}", display_name);
138 }
139 Err(e) => {
140 let display_name = if name_or_content.trim().starts_with("---") {
141 "custom content"
142 } else {
143 &name_or_content
144 };
145 warn!(
146 session_id,
147 "Failed to create runner {}: {}", display_name, e
148 )
149 }
150 },
151 Err(e) => {
152 let display_name = if name_or_content.trim().starts_with("---") {
153 "custom content"
154 } else {
155 &name_or_content
156 };
157 warn!(
158 session_id,
159 "Failed to load playbook {}: {}", display_name, e
160 );
161 let event = SessionEvent::Error {
162 timestamp: crate::media::get_timestamp(),
163 track_id: session_id.clone(),
164 sender: "playbook".to_string(),
165 error: format!("{}", e),
166 code: None,
167 };
168 event_sender_to_client.send(event).ok();
169 return;
170 }
171 }
172 }
173 }
174
175 let recv_commands_loop = async {
176 while let Some(command) = command_receiver.recv().await {
177 if let Err(_) = active_call.enqueue_command(command).await {
178 break;
179 }
180 }
181 };
182
183 let mut event_receiver = active_call.event_sender.subscribe();
184 let send_events_loop = async {
185 loop {
186 match event_receiver.recv().await {
187 Ok(event) => {
188 if let Err(_) = event_sender_to_client.send(event) {
189 break;
190 }
191 }
192 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
193 Err(_) => break,
194 }
195 }
196 };
197
198 let send_ping_loop = async {
199 if ping_interval == 0 {
200 active_call.cancel_token.cancelled().await;
201 return;
202 }
203 let mut ticker = tokio::time::interval(Duration::from_secs(ping_interval));
204 loop {
205 ticker.tick().await;
206 let payload = Utc::now().to_rfc3339();
207 let event = SessionEvent::Ping {
208 timestamp: crate::media::get_timestamp(),
209 payload: Some(payload),
210 };
211 if let Err(_) = active_call.event_sender.send(event) {
212 break;
213 }
214 }
215 };
216
217 let guard = ActiveCallGuard::new(active_call.clone());
218 info!(
219 session_id,
220 active_calls = guard.active_calls,
221 ?call_type,
222 "new call started"
223 );
224 let receiver = active_call.new_receiver();
225
226 let (r, _) = join! {
227 active_call.serve(receiver),
228 async {
229 select!{
230 _ = send_ping_loop => {},
231 _ = cancel_token.cancelled() => {},
232 _ = send_events_loop => { },
233 _ = recv_commands_loop => {
234 info!(session_id, "Command receiver closed");
235 },
236 }
237 cancel_token.cancel();
238 }
239 };
240 match r {
241 Ok(_) => info!(session_id, "call ended successfully"),
242 Err(e) => warn!(session_id, "call ended with error: {}", e),
243 }
244
245 active_call.cleanup().await.ok();
246 debug!(session_id, "Call handler core completed");
247}
248
249pub async fn call_handler(
250 call_type: ActiveCallType,
251 ws: WebSocketUpgrade,
252 app_state: AppState,
253 params: CallParams,
254) -> Response {
255 let session_id = params
256 .id
257 .unwrap_or_else(|| format!("s.{}", Uuid::new_v4().to_string()));
258 let server_side_track = params.server_side_track.clone();
259 let dump_events = params.dump_events.unwrap_or(true);
260 let ping_interval = params.ping_interval.unwrap_or(20);
261
262 let resp = ws.on_upgrade(move |socket| async move {
263 let (mut ws_sender, mut ws_receiver) = socket.split();
264 let (audio_sender, audio_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
265 let (command_sender, command_receiver) = tokio::sync::mpsc::unbounded_channel::<Command>();
266 let (event_sender_to_client, mut event_receiver_from_core) =
267 tokio::sync::mpsc::unbounded_channel::<crate::event::SessionEvent>();
268 let cancel_token = CancellationToken::new();
269
270 let session_id_clone = session_id.clone();
272 let app_state_clone = app_state.clone();
273 let cancel_token_clone = cancel_token.clone();
274 crate::spawn(call_handler_core(
275 call_type,
276 session_id_clone,
277 app_state_clone,
278 cancel_token_clone,
279 audio_receiver,
280 server_side_track,
281 dump_events,
282 ping_interval.into(),
283 command_receiver,
284 event_sender_to_client,
285 ));
286
287 let recv_from_ws_loop = async {
289 while let Some(Ok(message)) = ws_receiver.next().await {
290 match message {
291 Message::Text(text) => {
292 let command = match serde_json::from_str::<Command>(&text) {
293 Ok(cmd) => cmd,
294 Err(e) => {
295 warn!(session_id, %text, "Failed to parse command {}", e);
296 continue;
297 }
298 };
299 if let Err(_) = command_sender.send(command) {
300 break;
301 }
302 }
303 Message::Binary(bin) => {
304 audio_sender.send(bin.into()).ok();
305 }
306 Message::Close(_) => {
307 info!(session_id, "WebSocket closed by client");
308 break;
309 }
310 _ => {}
311 }
312 }
313 };
314
315 let send_to_ws_loop = async {
316 while let Some(event) = event_receiver_from_core.recv().await {
317 trace!(session_id, %event, "Sending WS message");
318 let message = match event.into_ws_message() {
319 Ok(msg) => msg,
320 Err(e) => {
321 warn!(session_id, error=%e, "Failed to serialize event to WS message");
322 continue;
323 }
324 };
325 if let Err(_) = ws_sender.send(message).await {
326 info!(session_id, "WebSocket send failed, closing");
327 break;
328 }
329 }
330 };
331
332 select! {
333 _ = recv_from_ws_loop => {
334 info!(session_id, "WebSocket receive loop ended");
335 },
336 _ = send_to_ws_loop => {
337 info!(session_id, "WebSocket send loop ended");
338 },
339 _ = cancel_token.cancelled() => {
340 info!(session_id, "WebSocket cancelled");
341 },
342 }
343
344 cancel_token.cancel();
345 ws_sender.flush().await.ok();
346 ws_sender.close().await.ok();
347 debug!(session_id, "WebSocket connection closed");
348 });
349 resp
350}
351
352pub(crate) async fn get_iceservers(State(state): State<AppState>) -> Response {
353 if let Some(ice_servers) = state.config.ice_servers.as_ref() {
354 return Json(ice_servers).into_response();
355 }
356 Json(vec![IceServer {
357 urls: vec!["stun:stun.l.google.com:19302".to_string()],
358 ..Default::default()
359 }])
360 .into_response()
361}
362
363pub(crate) async fn list_active_calls(State(state): State<AppState>) -> Response {
364 let calls = state
365 .active_calls
366 .lock()
367 .unwrap()
368 .iter()
369 .map(|(_, c)| {
370 if let Ok(cs) = c.call_state.try_read() {
371 json!({
372 "id": c.session_id,
373 "callType": c.call_type,
374 "cs.option": cs.option,
375 "ringTime": cs.ring_time,
376 "startTime": cs.answer_time,
377 })
378 } else {
379 json!({
380 "id": c.session_id,
381 "callType": c.call_type,
382 "status": "locked",
383 })
384 }
385 })
386 .collect::<Vec<_>>();
387 Json(serde_json::json!({ "active_calls": calls })).into_response()
388}
389
390pub(crate) async fn kill_active_call(
391 Path(id): Path<String>,
392 State(state): State<AppState>,
393) -> Response {
394 let active_calls = state.active_calls.lock().unwrap();
395 if let Some(call) = active_calls.get(&id) {
396 call.cancel_token.cancel();
397 Json(serde_json::json!({ "status": "killed", "id": id })).into_response()
398 } else {
399 Json(serde_json::json!({ "status": "not_found", "id": id })).into_response()
400 }
401}
402
403trait IntoWsMessage {
404 fn into_ws_message(self) -> Result<Message, serde_json::Error>;
405}
406
407impl IntoWsMessage for crate::event::SessionEvent {
408 fn into_ws_message(self) -> Result<Message, serde_json::Error> {
409 match self {
410 SessionEvent::Binary { data, .. } => Ok(Message::Binary(data.into())),
411 SessionEvent::Ping { timestamp, payload } => {
412 let payload = payload.unwrap_or_else(|| timestamp.to_string());
413 Ok(Message::Ping(payload.into()))
414 }
415 event => serde_json::to_string(&event).map(|payload| Message::Text(payload.into())),
416 }
417 }
418}