1use crate::{
2 app::AppState,
3 call::{
4 ActiveCall, ActiveCallType, Command,
5 active_call::{ActiveCallGuard, CallParams},
6 },
7 handler::playbook,
8 playbook::{Playbook, PlaybookRunner},
9};
10use crate::{event::SessionEvent, media::track::TrackConfig};
11use axum::{
12 Json, Router,
13 extract::{Path, Query, State, WebSocketUpgrade, ws::Message},
14 response::{IntoResponse, Response},
15 routing::get,
16};
17use bytes::Bytes;
18use chrono::Utc;
19use futures::{SinkExt, StreamExt};
20use rustrtc::IceServer;
21use serde_json::json;
22use std::{path::PathBuf, sync::Arc, time::Duration};
23use tokio::{join, select};
24use tokio_util::sync::CancellationToken;
25use tracing::{debug, info, warn};
26use uuid::Uuid;
27
28pub fn call_router() -> Router<AppState> {
29 Router::new()
30 .route("/call", get(ws_handler))
31 .route("/call/webrtc", get(webrtc_handler))
32 .route("/call/sip", get(sip_handler))
33 .route("/iceservers", get(get_iceservers))
34 .route("/list", get(list_active_calls))
35 .route("/kill/{id}", get(kill_active_call))
36}
37
38pub fn playbook_router() -> Router<AppState> {
39 Router::new()
40 .route("/api/playbooks", get(playbook::list_playbooks))
41 .route(
42 "/api/playbooks/{name}",
43 get(playbook::get_playbook).post(playbook::save_playbook),
44 )
45 .route(
46 "/api/playbook/run",
47 axum::routing::post(playbook::run_playbook),
48 )
49 .route("/api/records", get(playbook::list_records))
50}
51
52pub async fn ws_handler(
53 ws: WebSocketUpgrade,
54 State(state): State<AppState>,
55 Query(params): Query<CallParams>,
56) -> Response {
57 call_handler(ActiveCallType::WebSocket, ws, state, params).await
58}
59
60pub async fn sip_handler(
61 ws: WebSocketUpgrade,
62 State(state): State<AppState>,
63 Query(params): Query<CallParams>,
64) -> Response {
65 call_handler(ActiveCallType::Sip, ws, state, params).await
66}
67
68pub async fn webrtc_handler(
69 ws: WebSocketUpgrade,
70 State(state): State<AppState>,
71 Query(params): Query<CallParams>,
72) -> Response {
73 call_handler(ActiveCallType::Webrtc, ws, state, params).await
74}
75
76pub async fn call_handler(
77 call_type: ActiveCallType,
78 ws: WebSocketUpgrade,
79 app_state: AppState,
80 params: CallParams,
81) -> Response {
82 let session_id = params
83 .id
84 .unwrap_or_else(|| format!("s.{}", Uuid::new_v4().to_string()));
85 let server_side_track = params.server_side_track.clone();
86 let dump_events = params.dump_events.unwrap_or(true);
87 let ping_interval = params.ping_interval.unwrap_or(20);
88
89 let resp = ws.on_upgrade(move |socket| async move {
90 let (mut ws_sender, mut ws_receiver) = socket.split();
91 let (audio_sender, audio_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
92 let cancel_token = CancellationToken::new();
93 let track_config = TrackConfig::default();
94 let active_call = Arc::new(ActiveCall::new(
95 call_type.clone(),
96 cancel_token.clone(),
97 session_id.clone(),
98 app_state.invitation.clone(),
99 app_state.clone(),
100 track_config,
101 Some(audio_receiver),
102 dump_events,
103 server_side_track,
104 None, ));
106
107 {
109 let mut pending = app_state.pending_playbooks.lock().await;
110 if let Some(name) = pending.remove(&session_id) {
111 let path = PathBuf::from("config/playbook").join(&name);
112 match Playbook::load(path).await {
113 Ok(playbook) => match PlaybookRunner::new(playbook, active_call.clone()) {
114 Ok(runner) => {
115 tokio::spawn(async move {
116 runner.run().await;
117 });
118 info!(session_id, "Playbook runner started for {}", name);
119 }
120 Err(e) => warn!(session_id, "Failed to create runner {}: {}", name, e),
121 },
122 Err(e) => {
123 warn!(session_id, "Failed to load playbook {}: {}", name, e);
124 }
125 }
126 }
127 }
128
129 let recv_from_ws_loop = async {
130 while let Some(Ok(message)) = ws_receiver.next().await {
131 match message {
132 Message::Text(text) => {
133 let command = match serde_json::from_str::<Command>(&text) {
134 Ok(cmd) => cmd,
135 Err(e) => {
136 warn!(session_id, %text, "Failed to parse command {}",e);
137 continue;
138 }
139 };
140 if let Err(_) = active_call.enqueue_command(command).await {
141 break;
142 }
143 }
144 Message::Binary(bin) => {
145 audio_sender.send(bin.into()).ok();
146 }
147 Message::Close(_) => {
148 info!(session_id, "WebSocket closed by client");
149 break;
150 }
151 _ => {}
152 }
153 }
154 };
155
156 let mut event_receiver = active_call.event_sender.subscribe();
157 let send_to_ws_loop = async {
158 while let Ok(event) = event_receiver.recv().await {
159 let message = match event.into_ws_message() {
160 Ok(msg) => msg,
161 Err(_) => continue,
162 };
163 if let Err(_) = ws_sender.send(message).await {
164 break;
165 }
166 }
167 };
168
169 let send_ping_loop = async {
170 if ping_interval == 0 {
171 active_call.cancel_token.cancelled().await;
172 return;
173 }
174 let mut ticker = tokio::time::interval(Duration::from_secs(ping_interval.into()));
175 loop {
176 ticker.tick().await;
177 let payload = Utc::now().to_rfc3339();
178 let event = SessionEvent::Ping {
179 timestamp: crate::media::get_timestamp(),
180 payload: Some(payload),
181 };
182 if let Err(_) = active_call.event_sender.send(event) {
183 break;
184 }
185 }
186 };
187 let guard = ActiveCallGuard::new(active_call.clone());
188 info!(
189 session_id,
190 active_calls = guard.active_calls,
191 ?call_type,
192 "new call started"
193 );
194 let receiver = active_call.new_receiver();
195
196 let (r, _) = join! {
197 active_call.serve(receiver),
198 async {
199 select!{
200 _ = send_ping_loop => {},
201 _ = cancel_token.cancelled() => {},
202 _ = send_to_ws_loop => { },
203 _ = recv_from_ws_loop => {
204 info!(session_id, "WebSocket closed by client");
205 },
206 }
207 cancel_token.cancel();
208 }
209 };
210 match r {
211 Ok(_) => info!(session_id, "call ended successfully"),
212 Err(e) => warn!(session_id, "call ended with error: {}", e),
213 }
214
215 active_call.cleanup().await.ok();
216 while let Ok(event) = event_receiver.try_recv() {
218 let message = match event.into_ws_message() {
219 Ok(msg) => msg,
220 Err(_) => continue,
221 };
222 if let Err(_) = ws_sender.send(message).await {
223 break;
224 }
225 }
226 ws_sender.flush().await.ok();
227 ws_sender.close().await.ok();
228 debug!(session_id, "WebSocket connection closed");
229 });
230 resp
231}
232
233pub(crate) async fn get_iceservers(State(state): State<AppState>) -> Response {
234 if let Some(ice_servers) = state.config.ice_servers.as_ref() {
235 return Json(ice_servers).into_response();
236 }
237 Json(vec![IceServer {
238 urls: vec!["stun:stun.l.google.com:19302".to_string()],
239 ..Default::default()
240 }])
241 .into_response()
242}
243
244pub(crate) async fn list_active_calls(State(state): State<AppState>) -> Response {
245 let calls = state
246 .active_calls
247 .lock()
248 .unwrap()
249 .iter()
250 .map(|(_, c)| {
251 let cs = c.call_state.read().unwrap();
252 json!({
253 "id": c.session_id,
254 "callType": c.call_type,
255 "cs.option": cs.option,
256 "ringTime": cs.ring_time,
257 "startTime": cs.answer_time,
258 })
259 })
260 .collect::<Vec<_>>();
261 Json(serde_json::json!({ "active_calls": calls })).into_response()
262}
263
264pub(crate) async fn kill_active_call(
265 Path(id): Path<String>,
266 State(state): State<AppState>,
267) -> Response {
268 let active_calls = state.active_calls.lock().unwrap();
269 if let Some(call) = active_calls.get(&id) {
270 call.cancel_token.cancel();
271 Json(serde_json::json!({ "status": "killed", "id": id })).into_response()
272 } else {
273 Json(serde_json::json!({ "status": "not_found", "id": id })).into_response()
274 }
275}
276
277trait IntoWsMessage {
278 fn into_ws_message(self) -> Result<Message, serde_json::Error>;
279}
280
281impl IntoWsMessage for crate::event::SessionEvent {
282 fn into_ws_message(self) -> Result<Message, serde_json::Error> {
283 match self {
284 SessionEvent::Binary { data, .. } => Ok(Message::Binary(data.into())),
285 SessionEvent::Ping { timestamp, payload } => {
286 let payload = payload.unwrap_or_else(|| timestamp.to_string());
287 Ok(Message::Ping(payload.into()))
288 }
289 event => serde_json::to_string(&event).map(|payload| Message::Text(payload.into())),
290 }
291 }
292}