1use crate::{
2 app::AppState,
3 call::{
4 ActiveCall, ActiveCallType, Command,
5 active_call::{ActiveCallGuard, CallParams},
6 },
7 handler::playbook,
8 playbook::{Playbook, PlaybookRunner},
9};
10use axum::{
11 Json, Router,
12 extract::{Path, Query, State, WebSocketUpgrade, ws::Message},
13 response::{IntoResponse, Response},
14 routing::get,
15};
16use bytes::Bytes;
17use chrono::Utc;
18use futures::{SinkExt, StreamExt};
19use serde_json::json;
20use std::{path::PathBuf, sync::Arc, time::Duration};
21use tokio::{join, select};
22use tokio_util::sync::CancellationToken;
23use tracing::{debug, info, warn};
24use uuid::Uuid;
25use voice_engine::{IceServer, event::SessionEvent, media::track::TrackConfig};
26
27pub fn call_router() -> Router<AppState> {
28 Router::new()
29 .route("/call", get(ws_handler))
30 .route("/call/webrtc", get(webrtc_handler))
31 .route("/call/sip", get(sip_handler))
32 .route("/iceservers", get(get_iceservers))
33 .route("/list", get(list_active_calls))
34 .route("/kill/{id}", get(kill_active_call))
35}
36
37pub fn playbook_router() -> Router<AppState> {
38 Router::new()
39 .route("/api/playbooks", get(playbook::list_playbooks))
40 .route(
41 "/api/playbooks/{name}",
42 get(playbook::get_playbook).post(playbook::save_playbook),
43 )
44 .route(
45 "/api/playbook/run",
46 axum::routing::post(playbook::run_playbook),
47 )
48 .route("/api/records", get(playbook::list_records))
49}
50
51pub async fn ws_handler(
52 ws: WebSocketUpgrade,
53 State(state): State<AppState>,
54 Query(params): Query<CallParams>,
55) -> Response {
56 call_handler(ActiveCallType::WebSocket, ws, state, params).await
57}
58
59pub async fn sip_handler(
60 ws: WebSocketUpgrade,
61 State(state): State<AppState>,
62 Query(params): Query<CallParams>,
63) -> Response {
64 call_handler(ActiveCallType::Sip, ws, state, params).await
65}
66
67pub async fn webrtc_handler(
68 ws: WebSocketUpgrade,
69 State(state): State<AppState>,
70 Query(params): Query<CallParams>,
71) -> Response {
72 call_handler(ActiveCallType::Webrtc, ws, state, params).await
73}
74
75pub async fn call_handler(
76 call_type: ActiveCallType,
77 ws: WebSocketUpgrade,
78 app_state: AppState,
79 params: CallParams,
80) -> Response {
81 let session_id = params
82 .id
83 .unwrap_or_else(|| format!("s.{}", Uuid::new_v4().to_string()));
84 let server_side_track = params.server_side_track.clone();
85 let dump_events = params.dump_events.unwrap_or(true);
86 let ping_interval = params.ping_interval.unwrap_or(20);
87
88 let resp = ws.on_upgrade(move |socket| async move {
89 let (mut ws_sender, mut ws_receiver) = socket.split();
90 let (audio_sender, audio_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
91 let cancel_token = CancellationToken::new();
92 let track_config = TrackConfig::default();
93 let active_call = Arc::new(ActiveCall::new(
94 call_type.clone(),
95 cancel_token.clone(),
96 session_id.clone(),
97 app_state.invitation.clone(),
98 app_state.clone(),
99 track_config,
100 Some(audio_receiver),
101 dump_events,
102 server_side_track,
103 None, ));
105
106 {
108 let mut pending = app_state.pending_playbooks.lock().await;
109 if let Some(name) = pending.remove(&session_id) {
110 let path = PathBuf::from("config/playbook").join(&name);
111 match Playbook::load(path).await {
112 Ok(playbook) => match PlaybookRunner::new(playbook, active_call.clone()) {
113 Ok(runner) => {
114 tokio::spawn(async move {
115 runner.run().await;
116 });
117 info!(session_id, "Playbook runner started for {}", name);
118 }
119 Err(e) => warn!(session_id, "Failed to create runner {}: {}", name, e),
120 },
121 Err(e) => {
122 warn!(session_id, "Failed to load playbook {}: {}", name, e);
123 }
124 }
125 }
126 }
127
128 let recv_from_ws_loop = async {
129 while let Some(Ok(message)) = ws_receiver.next().await {
130 match message {
131 Message::Text(text) => {
132 let command = match serde_json::from_str::<Command>(&text) {
133 Ok(cmd) => cmd,
134 Err(e) => {
135 warn!(session_id, %text, "Failed to parse command {}",e);
136 continue;
137 }
138 };
139 if let Err(_) = active_call.enqueue_command(command).await {
140 break;
141 }
142 }
143 Message::Binary(bin) => {
144 audio_sender.send(bin.into()).ok();
145 }
146 Message::Close(_) => {
147 info!(session_id, "WebSocket closed by client");
148 break;
149 }
150 _ => {}
151 }
152 }
153 };
154
155 let mut event_receiver = active_call.event_sender.subscribe();
156 let send_to_ws_loop = async {
157 while let Ok(event) = event_receiver.recv().await {
158 let message = match event.into_ws_message() {
159 Ok(msg) => msg,
160 Err(_) => continue,
161 };
162 if let Err(_) = ws_sender.send(message).await {
163 break;
164 }
165 }
166 };
167
168 let send_ping_loop = async {
169 if ping_interval == 0 {
170 active_call.cancel_token.cancelled().await;
171 return;
172 }
173 let mut ticker = tokio::time::interval(Duration::from_secs(ping_interval.into()));
174 loop {
175 ticker.tick().await;
176 let payload = Utc::now().to_rfc3339();
177 let event = SessionEvent::Ping {
178 timestamp: voice_engine::media::get_timestamp(),
179 payload: Some(payload),
180 };
181 if let Err(_) = active_call.event_sender.send(event) {
182 break;
183 }
184 }
185 };
186 let guard = ActiveCallGuard::new(active_call.clone());
187 info!(
188 session_id,
189 active_calls = guard.active_calls,
190 ?call_type,
191 "new call started"
192 );
193 let receiver = active_call.new_receiver();
194
195 let (r, _) = join! {
196 active_call.serve(receiver),
197 async {
198 select!{
199 _ = send_ping_loop => {},
200 _ = cancel_token.cancelled() => {},
201 _ = send_to_ws_loop => { },
202 _ = recv_from_ws_loop => {
203 info!(session_id, "WebSocket closed by client");
204 },
205 }
206 cancel_token.cancel();
207 }
208 };
209 match r {
210 Ok(_) => info!(session_id, "call ended successfully"),
211 Err(e) => warn!(session_id, "call ended with error: {}", e),
212 }
213
214 active_call.cleanup().await.ok();
215 while let Ok(event) = event_receiver.try_recv() {
217 let message = match event.into_ws_message() {
218 Ok(msg) => msg,
219 Err(_) => continue,
220 };
221 if let Err(_) = ws_sender.send(message).await {
222 break;
223 }
224 }
225 ws_sender.flush().await.ok();
226 ws_sender.close().await.ok();
227 debug!(session_id, "WebSocket connection closed");
228 });
229 resp
230}
231
232pub(crate) async fn get_iceservers(State(state): State<AppState>) -> Response {
233 if let Some(ice_servers) = state.config.ice_servers.as_ref() {
234 return Json(ice_servers).into_response();
235 }
236 Json(vec![IceServer {
237 urls: vec!["stun:stun.l.google.com:19302".to_string()],
238 ..Default::default()
239 }])
240 .into_response()
241}
242
243pub(crate) async fn list_active_calls(State(state): State<AppState>) -> Response {
244 let calls = state
245 .active_calls
246 .lock()
247 .unwrap()
248 .iter()
249 .map(|(_, c)| {
250 let cs = c.call_state.read().unwrap();
251 json!({
252 "id": c.session_id,
253 "callType": c.call_type,
254 "cs.option": cs.option,
255 "ringTime": cs.ring_time,
256 "startTime": cs.answer_time,
257 })
258 })
259 .collect::<Vec<_>>();
260 Json(serde_json::json!({ "active_calls": calls })).into_response()
261}
262
263pub(crate) async fn kill_active_call(
264 Path(id): Path<String>,
265 State(state): State<AppState>,
266) -> Response {
267 let active_calls = state.active_calls.lock().unwrap();
268 if let Some(call) = active_calls.get(&id) {
269 call.cancel_token.cancel();
270 Json(serde_json::json!({ "status": "killed", "id": id })).into_response()
271 } else {
272 Json(serde_json::json!({ "status": "not_found", "id": id })).into_response()
273 }
274}
275
276trait IntoWsMessage {
277 fn into_ws_message(self) -> Result<Message, serde_json::Error>;
278}
279
280impl IntoWsMessage for voice_engine::event::SessionEvent {
281 fn into_ws_message(self) -> Result<Message, serde_json::Error> {
282 match self {
283 SessionEvent::Binary { data, .. } => Ok(Message::Binary(data.into())),
284 SessionEvent::Ping { timestamp, payload } => {
285 let payload = payload.unwrap_or_else(|| timestamp.to_string());
286 Ok(Message::Ping(payload.into()))
287 }
288 event => serde_json::to_string(&event).map(|payload| Message::Text(payload.into())),
289 }
290 }
291}