active_call/handler/
handler.rs

1use crate::{
2    app::AppState,
3    call::{
4        ActiveCall, ActiveCallType, Command,
5        active_call::{ActiveCallGuard, CallParams},
6    },
7    handler::playbook,
8    playbook::{Playbook, PlaybookRunner},
9};
10use crate::{event::SessionEvent, media::track::TrackConfig};
11use axum::{
12    Json, Router,
13    extract::{Path, Query, State, WebSocketUpgrade, ws::Message},
14    response::{IntoResponse, Response},
15    routing::get,
16};
17use bytes::Bytes;
18use chrono::Utc;
19use futures::{SinkExt, StreamExt};
20use rustrtc::IceServer;
21use serde_json::json;
22use std::{path::PathBuf, sync::Arc, time::Duration};
23use tokio::{join, select};
24use tokio_util::sync::CancellationToken;
25use tracing::{debug, info, warn};
26use uuid::Uuid;
27
28pub fn call_router() -> Router<AppState> {
29    let r = Router::new()
30        .route("/call", get(ws_handler))
31        .route("/call/webrtc", get(webrtc_handler))
32        .route("/call/sip", get(sip_handler))
33        .route("/list", get(list_active_calls))
34        .route("/kill/{id}", get(kill_active_call));
35    r
36}
37
38pub fn iceservers_router() -> Router<AppState> {
39    let r = Router::new();
40    r.route("/iceservers", get(get_iceservers))
41}
42
43pub fn playbook_router() -> Router<AppState> {
44    Router::new()
45        .route("/api/playbooks", get(playbook::list_playbooks))
46        .route(
47            "/api/playbooks/{name}",
48            get(playbook::get_playbook).post(playbook::save_playbook),
49        )
50        .route(
51            "/api/playbook/run",
52            axum::routing::post(playbook::run_playbook),
53        )
54        .route("/api/records", get(playbook::list_records))
55}
56
57pub async fn ws_handler(
58    ws: WebSocketUpgrade,
59    State(state): State<AppState>,
60    Query(params): Query<CallParams>,
61) -> Response {
62    call_handler(ActiveCallType::WebSocket, ws, state, params).await
63}
64
65pub async fn sip_handler(
66    ws: WebSocketUpgrade,
67    State(state): State<AppState>,
68    Query(params): Query<CallParams>,
69) -> Response {
70    call_handler(ActiveCallType::Sip, ws, state, params).await
71}
72
73pub async fn webrtc_handler(
74    ws: WebSocketUpgrade,
75    State(state): State<AppState>,
76    Query(params): Query<CallParams>,
77) -> Response {
78    call_handler(ActiveCallType::Webrtc, ws, state, params).await
79}
80
81pub async fn call_handler(
82    call_type: ActiveCallType,
83    ws: WebSocketUpgrade,
84    app_state: AppState,
85    params: CallParams,
86) -> Response {
87    let session_id = params
88        .id
89        .unwrap_or_else(|| format!("s.{}", Uuid::new_v4().to_string()));
90    let server_side_track = params.server_side_track.clone();
91    let dump_events = params.dump_events.unwrap_or(true);
92    let ping_interval = params.ping_interval.unwrap_or(20);
93
94    let resp = ws.on_upgrade(move |socket| async move {
95        let (mut ws_sender, mut ws_receiver) = socket.split();
96        let (audio_sender, audio_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
97        let cancel_token = CancellationToken::new();
98        let track_config = TrackConfig::default();
99        let active_call = Arc::new(ActiveCall::new(
100            call_type.clone(),
101            cancel_token.clone(),
102            session_id.clone(),
103            app_state.invitation.clone(),
104            app_state.clone(),
105            track_config,
106            Some(audio_receiver),
107            dump_events,
108            server_side_track,
109            None, // No extra data for now
110        ));
111
112        // Check for pending playbook
113        {
114            let mut pending = app_state.pending_playbooks.lock().await;
115            if let Some(name_or_content) = pending.remove(&session_id) {
116                let variables = active_call.call_state.read().await.extras.clone();
117                let playbook_result = if name_or_content.trim().starts_with("---") {
118                    Playbook::parse(&name_or_content, variables.as_ref())
119                } else {
120                    let path = PathBuf::from("config/playbook").join(&name_or_content);
121                    Playbook::load(path, variables.as_ref()).await
122                };
123
124                match playbook_result {
125                    Ok(playbook) => match PlaybookRunner::new(playbook, active_call.clone()) {
126                        Ok(runner) => {
127                            crate::spawn(async move {
128                                runner.run().await;
129                            });
130                            let display_name = if name_or_content.trim().starts_with("---") {
131                                "custom content"
132                            } else {
133                                &name_or_content
134                            };
135                            info!(session_id, "Playbook runner started for {}", display_name);
136                        }
137                        Err(e) => {
138                            let display_name = if name_or_content.trim().starts_with("---") {
139                                "custom content"
140                            } else {
141                                &name_or_content
142                            };
143                            warn!(
144                                session_id,
145                                "Failed to create runner {}: {}", display_name, e
146                            )
147                        }
148                    },
149                    Err(e) => {
150                        let display_name = if name_or_content.trim().starts_with("---") {
151                            "custom content"
152                        } else {
153                            &name_or_content
154                        };
155                        warn!(
156                            session_id,
157                            "Failed to load playbook {}: {}", display_name, e
158                        );
159                        let event = SessionEvent::Error {
160                            timestamp: crate::media::get_timestamp(),
161                            track_id: session_id,
162                            sender: "playbook".to_string(),
163                            error: format!("{}", e),
164                            code: None,
165                        };
166                        let message = match event.into_ws_message() {
167                            Ok(msg) => msg,
168                            Err(_) => return,
169                        };
170                        ws_sender.send(message).await.ok();
171                        return;
172                    }
173                }
174            }
175        }
176
177        let recv_from_ws_loop = async {
178            while let Some(Ok(message)) = ws_receiver.next().await {
179                match message {
180                    Message::Text(text) => {
181                        let command = match serde_json::from_str::<Command>(&text) {
182                            Ok(cmd) => cmd,
183                            Err(e) => {
184                                warn!(session_id, %text, "Failed to parse command {}",e);
185                                continue;
186                            }
187                        };
188                        if let Err(_) = active_call.enqueue_command(command).await {
189                            break;
190                        }
191                    }
192                    Message::Binary(bin) => {
193                        audio_sender.send(bin.into()).ok();
194                    }
195                    Message::Close(_) => {
196                        info!(session_id, "WebSocket closed by client");
197                        break;
198                    }
199                    _ => {}
200                }
201            }
202        };
203
204        let mut event_receiver = active_call.event_sender.subscribe();
205        let send_to_ws_loop = async {
206            loop {
207                match event_receiver.recv().await {
208                    Ok(event) => {
209                        let message = match event.into_ws_message() {
210                            Ok(msg) => msg,
211                            Err(_) => continue,
212                        };
213                        if let Err(_) = ws_sender.send(message).await {
214                            break;
215                        }
216                    }
217                    Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
218                    Err(_) => break,
219                }
220            }
221        };
222
223        let send_ping_loop = async {
224            if ping_interval == 0 {
225                active_call.cancel_token.cancelled().await;
226                return;
227            }
228            let mut ticker = tokio::time::interval(Duration::from_secs(ping_interval.into()));
229            loop {
230                ticker.tick().await;
231                let payload = Utc::now().to_rfc3339();
232                let event = SessionEvent::Ping {
233                    timestamp: crate::media::get_timestamp(),
234                    payload: Some(payload),
235                };
236                if let Err(_) = active_call.event_sender.send(event) {
237                    break;
238                }
239            }
240        };
241        let guard = ActiveCallGuard::new(active_call.clone());
242        info!(
243            session_id,
244            active_calls = guard.active_calls,
245            ?call_type,
246            "new call started"
247        );
248        let receiver = active_call.new_receiver();
249
250        let (r, _) = join! {
251            active_call.serve(receiver),
252            async {
253                select!{
254                    _ = send_ping_loop => {},
255                    _ = cancel_token.cancelled() => {},
256                    _ = send_to_ws_loop => { },
257                    _ = recv_from_ws_loop => {
258                        info!(session_id, "WebSocket closed by client");
259                    },
260                }
261                cancel_token.cancel();
262            }
263        };
264        match r {
265            Ok(_) => info!(session_id, "call ended successfully"),
266            Err(e) => warn!(session_id, "call ended with error: {}", e),
267        }
268
269        active_call.cleanup().await.ok();
270        // Drain remaining events
271        while let Ok(event) = event_receiver.try_recv() {
272            let message = match event.into_ws_message() {
273                Ok(msg) => msg,
274                Err(_) => continue,
275            };
276            if let Err(_) = ws_sender.send(message).await {
277                break;
278            }
279        }
280        ws_sender.flush().await.ok();
281        ws_sender.close().await.ok();
282        debug!(session_id, "WebSocket connection closed");
283    });
284    resp
285}
286
287pub(crate) async fn get_iceservers(State(state): State<AppState>) -> Response {
288    if let Some(ice_servers) = state.config.ice_servers.as_ref() {
289        return Json(ice_servers).into_response();
290    }
291    Json(vec![IceServer {
292        urls: vec!["stun:stun.l.google.com:19302".to_string()],
293        ..Default::default()
294    }])
295    .into_response()
296}
297
298pub(crate) async fn list_active_calls(State(state): State<AppState>) -> Response {
299    let calls = state
300        .active_calls
301        .lock()
302        .unwrap()
303        .iter()
304        .map(|(_, c)| {
305            if let Ok(cs) = c.call_state.try_read() {
306                json!({
307                    "id": c.session_id,
308                    "callType": c.call_type,
309                    "cs.option": cs.option,
310                    "ringTime": cs.ring_time,
311                    "startTime": cs.answer_time,
312                })
313            } else {
314                json!({
315                    "id": c.session_id,
316                    "callType": c.call_type,
317                    "status": "locked",
318                })
319            }
320        })
321        .collect::<Vec<_>>();
322    Json(serde_json::json!({ "active_calls": calls })).into_response()
323}
324
325pub(crate) async fn kill_active_call(
326    Path(id): Path<String>,
327    State(state): State<AppState>,
328) -> Response {
329    let active_calls = state.active_calls.lock().unwrap();
330    if let Some(call) = active_calls.get(&id) {
331        call.cancel_token.cancel();
332        Json(serde_json::json!({ "status": "killed", "id": id })).into_response()
333    } else {
334        Json(serde_json::json!({ "status": "not_found", "id": id })).into_response()
335    }
336}
337
338trait IntoWsMessage {
339    fn into_ws_message(self) -> Result<Message, serde_json::Error>;
340}
341
342impl IntoWsMessage for crate::event::SessionEvent {
343    fn into_ws_message(self) -> Result<Message, serde_json::Error> {
344        match self {
345            SessionEvent::Binary { data, .. } => Ok(Message::Binary(data.into())),
346            SessionEvent::Ping { timestamp, payload } => {
347                let payload = payload.unwrap_or_else(|| timestamp.to_string());
348                Ok(Message::Ping(payload.into()))
349            }
350            event => serde_json::to_string(&event).map(|payload| Message::Text(payload.into())),
351        }
352    }
353}