1use crate::{
2 app::AppState,
3 call::{
4 ActiveCall, ActiveCallType, Command,
5 active_call::{ActiveCallGuard, CallParams},
6 },
7 handler::playbook,
8 playbook::{Playbook, PlaybookRunner},
9};
10use crate::{event::SessionEvent, media::track::TrackConfig};
11use axum::{
12 Json, Router,
13 extract::{Path, Query, State, WebSocketUpgrade, ws::Message},
14 response::{IntoResponse, Response},
15 routing::get,
16};
17use bytes::Bytes;
18use chrono::Utc;
19use futures::{SinkExt, StreamExt};
20use rustrtc::IceServer;
21use serde_json::json;
22use std::{path::PathBuf, sync::Arc, time::Duration};
23use tokio::{join, select};
24use tokio_util::sync::CancellationToken;
25use tracing::{debug, info, warn};
26use uuid::Uuid;
27
28pub fn call_router() -> Router<AppState> {
29 let r = Router::new()
30 .route("/call", get(ws_handler))
31 .route("/call/webrtc", get(webrtc_handler))
32 .route("/call/sip", get(sip_handler))
33 .route("/list", get(list_active_calls))
34 .route("/kill/{id}", get(kill_active_call));
35 r
36}
37
38pub fn iceservers_router() -> Router<AppState> {
39 let r = Router::new();
40 r.route("/iceservers", get(get_iceservers))
41}
42
43pub fn playbook_router() -> Router<AppState> {
44 Router::new()
45 .route("/api/playbooks", get(playbook::list_playbooks))
46 .route(
47 "/api/playbooks/{name}",
48 get(playbook::get_playbook).post(playbook::save_playbook),
49 )
50 .route(
51 "/api/playbook/run",
52 axum::routing::post(playbook::run_playbook),
53 )
54 .route("/api/records", get(playbook::list_records))
55}
56
57pub async fn ws_handler(
58 ws: WebSocketUpgrade,
59 State(state): State<AppState>,
60 Query(params): Query<CallParams>,
61) -> Response {
62 call_handler(ActiveCallType::WebSocket, ws, state, params).await
63}
64
65pub async fn sip_handler(
66 ws: WebSocketUpgrade,
67 State(state): State<AppState>,
68 Query(params): Query<CallParams>,
69) -> Response {
70 call_handler(ActiveCallType::Sip, ws, state, params).await
71}
72
73pub async fn webrtc_handler(
74 ws: WebSocketUpgrade,
75 State(state): State<AppState>,
76 Query(params): Query<CallParams>,
77) -> Response {
78 call_handler(ActiveCallType::Webrtc, ws, state, params).await
79}
80
81pub async fn call_handler_core(
83 call_type: ActiveCallType,
84 session_id: String,
85 app_state: AppState,
86 cancel_token: CancellationToken,
87 audio_receiver: tokio::sync::mpsc::UnboundedReceiver<Bytes>,
88 server_side_track: Option<String>,
89 dump_events: bool,
90 ping_interval: u64,
91 mut command_receiver: tokio::sync::mpsc::UnboundedReceiver<Command>,
92 event_sender_to_client: tokio::sync::mpsc::UnboundedSender<crate::event::SessionEvent>,
93) {
94 let _cancel_guard = cancel_token.clone().drop_guard();
95 let track_config = TrackConfig::default();
96 let active_call = Arc::new(ActiveCall::new(
97 call_type.clone(),
98 cancel_token.clone(),
99 session_id.clone(),
100 app_state.invitation.clone(),
101 app_state.clone(),
102 track_config,
103 Some(audio_receiver),
104 dump_events,
105 server_side_track,
106 None, ));
108
109 {
111 let mut pending = app_state.pending_playbooks.lock().await;
112 if let Some(name_or_content) = pending.remove(&session_id) {
113 let variables = active_call.call_state.read().await.extras.clone();
114 let playbook_result = if name_or_content.trim().starts_with("---") {
115 Playbook::parse(&name_or_content, variables.as_ref())
116 } else {
117 let path = if name_or_content.starts_with("config/playbook/") {
119 PathBuf::from(&name_or_content)
120 } else {
121 PathBuf::from("config/playbook").join(&name_or_content)
122 };
123 Playbook::load(path, variables.as_ref()).await
124 };
125
126 match playbook_result {
127 Ok(playbook) => match PlaybookRunner::new(playbook, active_call.clone()) {
128 Ok(runner) => {
129 crate::spawn(async move {
130 runner.run().await;
131 });
132 let display_name = if name_or_content.trim().starts_with("---") {
133 "custom content"
134 } else {
135 &name_or_content
136 };
137 info!(session_id, "Playbook runner started for {}", display_name);
138 }
139 Err(e) => {
140 let display_name = if name_or_content.trim().starts_with("---") {
141 "custom content"
142 } else {
143 &name_or_content
144 };
145 warn!(
146 session_id,
147 "Failed to create runner {}: {}", display_name, e
148 )
149 }
150 },
151 Err(e) => {
152 let display_name = if name_or_content.trim().starts_with("---") {
153 "custom content"
154 } else {
155 &name_or_content
156 };
157 warn!(
158 session_id,
159 "Failed to load playbook {}: {}", display_name, e
160 );
161 let event = SessionEvent::Error {
162 timestamp: crate::media::get_timestamp(),
163 track_id: session_id.clone(),
164 sender: "playbook".to_string(),
165 error: format!("{}", e),
166 code: None,
167 };
168 event_sender_to_client.send(event).ok();
169 return;
170 }
171 }
172 }
173 }
174
175 let recv_commands_loop = async {
176 while let Some(command) = command_receiver.recv().await {
177 if let Err(_) = active_call.enqueue_command(command).await {
178 break;
179 }
180 }
181 };
182
183 let mut event_receiver = active_call.event_sender.subscribe();
184 let send_events_loop = async {
185 loop {
186 match event_receiver.recv().await {
187 Ok(event) => {
188 if let Err(_) = event_sender_to_client.send(event) {
189 break;
190 }
191 }
192 Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
193 Err(_) => break,
194 }
195 }
196 };
197
198 let send_ping_loop = async {
199 if ping_interval == 0 {
200 active_call.cancel_token.cancelled().await;
201 return;
202 }
203 let mut ticker = tokio::time::interval(Duration::from_secs(ping_interval));
204 loop {
205 ticker.tick().await;
206 let payload = Utc::now().to_rfc3339();
207 let event = SessionEvent::Ping {
208 timestamp: crate::media::get_timestamp(),
209 payload: Some(payload),
210 };
211 if let Err(_) = active_call.event_sender.send(event) {
212 break;
213 }
214 }
215 };
216
217 let guard = ActiveCallGuard::new(active_call.clone());
218 info!(
219 session_id,
220 active_calls = guard.active_calls,
221 ?call_type,
222 "new call started"
223 );
224 let receiver = active_call.new_receiver();
225
226 let (r, _) = join! {
227 active_call.serve(receiver),
228 async {
229 select!{
230 _ = send_ping_loop => {},
231 _ = cancel_token.cancelled() => {},
232 _ = send_events_loop => { },
233 _ = recv_commands_loop => {
234 info!(session_id, "Command receiver closed");
235 },
236 }
237 cancel_token.cancel();
238 }
239 };
240 match r {
241 Ok(_) => info!(session_id, "call ended successfully"),
242 Err(e) => warn!(session_id, "call ended with error: {}", e),
243 }
244
245 active_call.cleanup().await.ok();
246 debug!(session_id, "Call handler core completed");
247}
248
249pub async fn call_handler(
250 call_type: ActiveCallType,
251 ws: WebSocketUpgrade,
252 app_state: AppState,
253 params: CallParams,
254) -> Response {
255 let session_id = params
256 .id
257 .unwrap_or_else(|| format!("s.{}", Uuid::new_v4().to_string()));
258 let server_side_track = params.server_side_track.clone();
259 let dump_events = params.dump_events.unwrap_or(true);
260 let ping_interval = params.ping_interval.unwrap_or(20);
261
262 let resp = ws.on_upgrade(move |socket| async move {
263 let (mut ws_sender, mut ws_receiver) = socket.split();
264 let (audio_sender, audio_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
265 let (command_sender, command_receiver) = tokio::sync::mpsc::unbounded_channel::<Command>();
266 let (event_sender_to_client, mut event_receiver_from_core) =
267 tokio::sync::mpsc::unbounded_channel::<crate::event::SessionEvent>();
268 let cancel_token = CancellationToken::new();
269
270 let session_id_clone = session_id.clone();
272 let app_state_clone = app_state.clone();
273 let cancel_token_clone = cancel_token.clone();
274 crate::spawn(call_handler_core(
275 call_type,
276 session_id_clone,
277 app_state_clone,
278 cancel_token_clone,
279 audio_receiver,
280 server_side_track,
281 dump_events,
282 ping_interval.into(),
283 command_receiver,
284 event_sender_to_client,
285 ));
286
287 let recv_from_ws_loop = async {
289 while let Some(Ok(message)) = ws_receiver.next().await {
290 match message {
291 Message::Text(text) => {
292 let command = match serde_json::from_str::<Command>(&text) {
293 Ok(cmd) => cmd,
294 Err(e) => {
295 warn!(session_id, %text, "Failed to parse command {}", e);
296 continue;
297 }
298 };
299 if let Err(_) = command_sender.send(command) {
300 break;
301 }
302 }
303 Message::Binary(bin) => {
304 audio_sender.send(bin.into()).ok();
305 }
306 Message::Close(_) => {
307 info!(session_id, "WebSocket closed by client");
308 break;
309 }
310 _ => {}
311 }
312 }
313 };
314
315 let send_to_ws_loop = async {
316 while let Some(event) = event_receiver_from_core.recv().await {
317 let message = match event.into_ws_message() {
318 Ok(msg) => msg,
319 Err(_) => continue,
320 };
321 if let Err(_) = ws_sender.send(message).await {
322 break;
323 }
324 }
325 };
326
327 select! {
328 _ = recv_from_ws_loop => {
329 info!(session_id, "WebSocket receive loop ended");
330 },
331 _ = send_to_ws_loop => {
332 info!(session_id, "WebSocket send loop ended");
333 },
334 _ = cancel_token.cancelled() => {
335 info!(session_id, "WebSocket cancelled");
336 },
337 }
338
339 cancel_token.cancel();
340 ws_sender.flush().await.ok();
341 ws_sender.close().await.ok();
342 debug!(session_id, "WebSocket connection closed");
343 });
344 resp
345}
346
347pub(crate) async fn get_iceservers(State(state): State<AppState>) -> Response {
348 if let Some(ice_servers) = state.config.ice_servers.as_ref() {
349 return Json(ice_servers).into_response();
350 }
351 Json(vec![IceServer {
352 urls: vec!["stun:stun.l.google.com:19302".to_string()],
353 ..Default::default()
354 }])
355 .into_response()
356}
357
358pub(crate) async fn list_active_calls(State(state): State<AppState>) -> Response {
359 let calls = state
360 .active_calls
361 .lock()
362 .unwrap()
363 .iter()
364 .map(|(_, c)| {
365 if let Ok(cs) = c.call_state.try_read() {
366 json!({
367 "id": c.session_id,
368 "callType": c.call_type,
369 "cs.option": cs.option,
370 "ringTime": cs.ring_time,
371 "startTime": cs.answer_time,
372 })
373 } else {
374 json!({
375 "id": c.session_id,
376 "callType": c.call_type,
377 "status": "locked",
378 })
379 }
380 })
381 .collect::<Vec<_>>();
382 Json(serde_json::json!({ "active_calls": calls })).into_response()
383}
384
385pub(crate) async fn kill_active_call(
386 Path(id): Path<String>,
387 State(state): State<AppState>,
388) -> Response {
389 let active_calls = state.active_calls.lock().unwrap();
390 if let Some(call) = active_calls.get(&id) {
391 call.cancel_token.cancel();
392 Json(serde_json::json!({ "status": "killed", "id": id })).into_response()
393 } else {
394 Json(serde_json::json!({ "status": "not_found", "id": id })).into_response()
395 }
396}
397
398trait IntoWsMessage {
399 fn into_ws_message(self) -> Result<Message, serde_json::Error>;
400}
401
402impl IntoWsMessage for crate::event::SessionEvent {
403 fn into_ws_message(self) -> Result<Message, serde_json::Error> {
404 match self {
405 SessionEvent::Binary { data, .. } => Ok(Message::Binary(data.into())),
406 SessionEvent::Ping { timestamp, payload } => {
407 let payload = payload.unwrap_or_else(|| timestamp.to_string());
408 Ok(Message::Ping(payload.into()))
409 }
410 event => serde_json::to_string(&event).map(|payload| Message::Text(payload.into())),
411 }
412 }
413}