active_call/handler/
handler.rs

1use crate::{
2    app::AppState,
3    call::{
4        ActiveCall, ActiveCallType, Command,
5        active_call::{ActiveCallGuard, CallParams},
6    },
7    handler::playbook,
8    playbook::{Playbook, PlaybookRunner},
9};
10use axum::{
11    Json, Router,
12    extract::{Path, Query, State, WebSocketUpgrade, ws::Message},
13    response::{IntoResponse, Response},
14    routing::get,
15};
16use bytes::Bytes;
17use chrono::Utc;
18use futures::{SinkExt, StreamExt};
19use serde_json::json;
20use std::{path::PathBuf, sync::Arc, time::Duration};
21use tokio::{join, select};
22use tokio_util::sync::CancellationToken;
23use tracing::{debug, info, warn};
24use uuid::Uuid;
25use voice_engine::{IceServer, event::SessionEvent, media::track::TrackConfig};
26
27pub fn call_router() -> Router<AppState> {
28    Router::new()
29        .route("/call", get(ws_handler))
30        .route("/call/webrtc", get(webrtc_handler))
31        .route("/call/sip", get(sip_handler))
32        .route("/iceservers", get(get_iceservers))
33        .route("/list", get(list_active_calls))
34        .route("/kill/{id}", get(kill_active_call))
35}
36
37pub fn playbook_router() -> Router<AppState> {
38    Router::new()
39        .route("/api/playbooks", get(playbook::list_playbooks))
40        .route(
41            "/api/playbooks/{name}",
42            get(playbook::get_playbook).post(playbook::save_playbook),
43        )
44        .route(
45            "/api/playbook/run",
46            axum::routing::post(playbook::run_playbook),
47        )
48        .route("/api/records", get(playbook::list_records))
49}
50
51pub async fn ws_handler(
52    ws: WebSocketUpgrade,
53    State(state): State<AppState>,
54    Query(params): Query<CallParams>,
55) -> Response {
56    call_handler(ActiveCallType::WebSocket, ws, state, params).await
57}
58
59pub async fn sip_handler(
60    ws: WebSocketUpgrade,
61    State(state): State<AppState>,
62    Query(params): Query<CallParams>,
63) -> Response {
64    call_handler(ActiveCallType::Sip, ws, state, params).await
65}
66
67pub async fn webrtc_handler(
68    ws: WebSocketUpgrade,
69    State(state): State<AppState>,
70    Query(params): Query<CallParams>,
71) -> Response {
72    call_handler(ActiveCallType::Webrtc, ws, state, params).await
73}
74
75pub async fn call_handler(
76    call_type: ActiveCallType,
77    ws: WebSocketUpgrade,
78    app_state: AppState,
79    params: CallParams,
80) -> Response {
81    let session_id = params
82        .id
83        .unwrap_or_else(|| format!("s.{}", Uuid::new_v4().to_string()));
84    let server_side_track = params.server_side_track.clone();
85    let dump_events = params.dump_events.unwrap_or(true);
86    let ping_interval = params.ping_interval.unwrap_or(20);
87
88    let resp = ws.on_upgrade(move |socket| async move {
89        let (mut ws_sender, mut ws_receiver) = socket.split();
90        let (audio_sender, audio_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
91        let cancel_token = CancellationToken::new();
92        let track_config = TrackConfig::default();
93        let active_call = Arc::new(ActiveCall::new(
94            call_type.clone(),
95            cancel_token.clone(),
96            session_id.clone(),
97            app_state.invitation.clone(),
98            app_state.clone(),
99            track_config,
100            Some(audio_receiver),
101            dump_events,
102            server_side_track,
103            None, // No extra data for now
104        ));
105
106        // Check for pending playbook
107        {
108            let mut pending = app_state.pending_playbooks.lock().await;
109            if let Some(name) = pending.remove(&session_id) {
110                let path = PathBuf::from("config/playbook").join(&name);
111                match Playbook::load(path).await {
112                    Ok(playbook) => match PlaybookRunner::new(playbook, active_call.clone()) {
113                        Ok(runner) => {
114                            tokio::spawn(async move {
115                                runner.run().await;
116                            });
117                            info!(session_id, "Playbook runner started for {}", name);
118                        }
119                        Err(e) => warn!(session_id, "Failed to create runner {}: {}", name, e),
120                    },
121                    Err(e) => {
122                        warn!(session_id, "Failed to load playbook {}: {}", name, e);
123                    }
124                }
125            }
126        }
127
128        let recv_from_ws_loop = async {
129            while let Some(Ok(message)) = ws_receiver.next().await {
130                match message {
131                    Message::Text(text) => {
132                        let command = match serde_json::from_str::<Command>(&text) {
133                            Ok(cmd) => cmd,
134                            Err(e) => {
135                                warn!(session_id, %text, "Failed to parse command {}",e);
136                                continue;
137                            }
138                        };
139                        if let Err(_) = active_call.enqueue_command(command).await {
140                            break;
141                        }
142                    }
143                    Message::Binary(bin) => {
144                        audio_sender.send(bin.into()).ok();
145                    }
146                    Message::Close(_) => {
147                        info!(session_id, "WebSocket closed by client");
148                        break;
149                    }
150                    _ => {}
151                }
152            }
153        };
154
155        let mut event_receiver = active_call.event_sender.subscribe();
156        let send_to_ws_loop = async {
157            while let Ok(event) = event_receiver.recv().await {
158                let message = match event.into_ws_message() {
159                    Ok(msg) => msg,
160                    Err(_) => continue,
161                };
162                if let Err(_) = ws_sender.send(message).await {
163                    break;
164                }
165            }
166        };
167
168        let send_ping_loop = async {
169            if ping_interval == 0 {
170                active_call.cancel_token.cancelled().await;
171                return;
172            }
173            let mut ticker = tokio::time::interval(Duration::from_secs(ping_interval.into()));
174            loop {
175                ticker.tick().await;
176                let payload = Utc::now().to_rfc3339();
177                let event = SessionEvent::Ping {
178                    timestamp: voice_engine::media::get_timestamp(),
179                    payload: Some(payload),
180                };
181                if let Err(_) = active_call.event_sender.send(event) {
182                    break;
183                }
184            }
185        };
186        let guard = ActiveCallGuard::new(active_call.clone());
187        info!(
188            session_id,
189            active_calls = guard.active_calls,
190            ?call_type,
191            "new call started"
192        );
193        let receiver = active_call.new_receiver();
194
195        let (r, _) = join! {
196            active_call.serve(receiver),
197            async {
198                select!{
199                    _ = send_ping_loop => {},
200                    _ = cancel_token.cancelled() => {},
201                    _ = send_to_ws_loop => { },
202                    _ = recv_from_ws_loop => {
203                        info!(session_id, "WebSocket closed by client");
204                    },
205                }
206                cancel_token.cancel();
207            }
208        };
209        match r {
210            Ok(_) => info!(session_id, "call ended successfully"),
211            Err(e) => warn!(session_id, "call ended with error: {}", e),
212        }
213
214        active_call.cleanup().await.ok();
215        // Drain remaining events
216        while let Ok(event) = event_receiver.try_recv() {
217            let message = match event.into_ws_message() {
218                Ok(msg) => msg,
219                Err(_) => continue,
220            };
221            if let Err(_) = ws_sender.send(message).await {
222                break;
223            }
224        }
225        ws_sender.flush().await.ok();
226        ws_sender.close().await.ok();
227        debug!(session_id, "WebSocket connection closed");
228    });
229    resp
230}
231
232pub(crate) async fn get_iceservers(State(state): State<AppState>) -> Response {
233    if let Some(ice_servers) = state.config.ice_servers.as_ref() {
234        return Json(ice_servers).into_response();
235    }
236    Json(vec![IceServer {
237        urls: vec!["stun:stun.l.google.com:19302".to_string()],
238        ..Default::default()
239    }])
240    .into_response()
241}
242
243pub(crate) async fn list_active_calls(State(state): State<AppState>) -> Response {
244    let calls = state
245        .active_calls
246        .lock()
247        .unwrap()
248        .iter()
249        .map(|(_, c)| {
250            let cs = c.call_state.read().unwrap();
251            json!({
252                "id": c.session_id,
253                "callType": c.call_type,
254                "cs.option": cs.option,
255                "ringTime": cs.ring_time,
256                "startTime": cs.answer_time,
257            })
258        })
259        .collect::<Vec<_>>();
260    Json(serde_json::json!({ "active_calls": calls })).into_response()
261}
262
263pub(crate) async fn kill_active_call(
264    Path(id): Path<String>,
265    State(state): State<AppState>,
266) -> Response {
267    let active_calls = state.active_calls.lock().unwrap();
268    if let Some(call) = active_calls.get(&id) {
269        call.cancel_token.cancel();
270        Json(serde_json::json!({ "status": "killed", "id": id })).into_response()
271    } else {
272        Json(serde_json::json!({ "status": "not_found", "id": id })).into_response()
273    }
274}
275
276trait IntoWsMessage {
277    fn into_ws_message(self) -> Result<Message, serde_json::Error>;
278}
279
280impl IntoWsMessage for voice_engine::event::SessionEvent {
281    fn into_ws_message(self) -> Result<Message, serde_json::Error> {
282        match self {
283            SessionEvent::Binary { data, .. } => Ok(Message::Binary(data.into())),
284            SessionEvent::Ping { timestamp, payload } => {
285                let payload = payload.unwrap_or_else(|| timestamp.to_string());
286                Ok(Message::Ping(payload.into()))
287            }
288            event => serde_json::to_string(&event).map(|payload| Message::Text(payload.into())),
289        }
290    }
291}