active_call/handler/
handler.rs

1use crate::{
2    app::AppState,
3    call::{
4        ActiveCall, ActiveCallType, Command,
5        active_call::{ActiveCallGuard, CallParams},
6    },
7    handler::playbook,
8    playbook::{Playbook, PlaybookRunner},
9};
10use crate::{event::SessionEvent, media::track::TrackConfig};
11use axum::{
12    Json, Router,
13    extract::{Path, Query, State, WebSocketUpgrade, ws::Message},
14    response::{IntoResponse, Response},
15    routing::get,
16};
17use bytes::Bytes;
18use chrono::Utc;
19use futures::{SinkExt, StreamExt};
20use rustrtc::IceServer;
21use serde_json::json;
22use std::{path::PathBuf, sync::Arc, time::Duration};
23use tokio::{join, select};
24use tokio_util::sync::CancellationToken;
25use tracing::{debug, info, warn};
26use uuid::Uuid;
27
28pub fn call_router() -> Router<AppState> {
29    Router::new()
30        .route("/call", get(ws_handler))
31        .route("/call/webrtc", get(webrtc_handler))
32        .route("/call/sip", get(sip_handler))
33        .route("/iceservers", get(get_iceservers))
34        .route("/list", get(list_active_calls))
35        .route("/kill/{id}", get(kill_active_call))
36}
37
38pub fn playbook_router() -> Router<AppState> {
39    Router::new()
40        .route("/api/playbooks", get(playbook::list_playbooks))
41        .route(
42            "/api/playbooks/{name}",
43            get(playbook::get_playbook).post(playbook::save_playbook),
44        )
45        .route(
46            "/api/playbook/run",
47            axum::routing::post(playbook::run_playbook),
48        )
49        .route("/api/records", get(playbook::list_records))
50}
51
52pub async fn ws_handler(
53    ws: WebSocketUpgrade,
54    State(state): State<AppState>,
55    Query(params): Query<CallParams>,
56) -> Response {
57    call_handler(ActiveCallType::WebSocket, ws, state, params).await
58}
59
60pub async fn sip_handler(
61    ws: WebSocketUpgrade,
62    State(state): State<AppState>,
63    Query(params): Query<CallParams>,
64) -> Response {
65    call_handler(ActiveCallType::Sip, ws, state, params).await
66}
67
68pub async fn webrtc_handler(
69    ws: WebSocketUpgrade,
70    State(state): State<AppState>,
71    Query(params): Query<CallParams>,
72) -> Response {
73    call_handler(ActiveCallType::Webrtc, ws, state, params).await
74}
75
76pub async fn call_handler(
77    call_type: ActiveCallType,
78    ws: WebSocketUpgrade,
79    app_state: AppState,
80    params: CallParams,
81) -> Response {
82    let session_id = params
83        .id
84        .unwrap_or_else(|| format!("s.{}", Uuid::new_v4().to_string()));
85    let server_side_track = params.server_side_track.clone();
86    let dump_events = params.dump_events.unwrap_or(true);
87    let ping_interval = params.ping_interval.unwrap_or(20);
88
89    let resp = ws.on_upgrade(move |socket| async move {
90        let (mut ws_sender, mut ws_receiver) = socket.split();
91        let (audio_sender, audio_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
92        let cancel_token = CancellationToken::new();
93        let track_config = TrackConfig::default();
94        let active_call = Arc::new(ActiveCall::new(
95            call_type.clone(),
96            cancel_token.clone(),
97            session_id.clone(),
98            app_state.invitation.clone(),
99            app_state.clone(),
100            track_config,
101            Some(audio_receiver),
102            dump_events,
103            server_side_track,
104            None, // No extra data for now
105        ));
106
107        // Check for pending playbook
108        {
109            let mut pending = app_state.pending_playbooks.lock().await;
110            if let Some(name) = pending.remove(&session_id) {
111                let path = PathBuf::from("config/playbook").join(&name);
112                match Playbook::load(path).await {
113                    Ok(playbook) => match PlaybookRunner::new(playbook, active_call.clone()) {
114                        Ok(runner) => {
115                            tokio::spawn(async move {
116                                runner.run().await;
117                            });
118                            info!(session_id, "Playbook runner started for {}", name);
119                        }
120                        Err(e) => warn!(session_id, "Failed to create runner {}: {}", name, e),
121                    },
122                    Err(e) => {
123                        warn!(session_id, "Failed to load playbook {}: {}", name, e);
124                    }
125                }
126            }
127        }
128
129        let recv_from_ws_loop = async {
130            while let Some(Ok(message)) = ws_receiver.next().await {
131                match message {
132                    Message::Text(text) => {
133                        let command = match serde_json::from_str::<Command>(&text) {
134                            Ok(cmd) => cmd,
135                            Err(e) => {
136                                warn!(session_id, %text, "Failed to parse command {}",e);
137                                continue;
138                            }
139                        };
140                        if let Err(_) = active_call.enqueue_command(command).await {
141                            break;
142                        }
143                    }
144                    Message::Binary(bin) => {
145                        audio_sender.send(bin.into()).ok();
146                    }
147                    Message::Close(_) => {
148                        info!(session_id, "WebSocket closed by client");
149                        break;
150                    }
151                    _ => {}
152                }
153            }
154        };
155
156        let mut event_receiver = active_call.event_sender.subscribe();
157        let send_to_ws_loop = async {
158            while let Ok(event) = event_receiver.recv().await {
159                let message = match event.into_ws_message() {
160                    Ok(msg) => msg,
161                    Err(_) => continue,
162                };
163                if let Err(_) = ws_sender.send(message).await {
164                    break;
165                }
166            }
167        };
168
169        let send_ping_loop = async {
170            if ping_interval == 0 {
171                active_call.cancel_token.cancelled().await;
172                return;
173            }
174            let mut ticker = tokio::time::interval(Duration::from_secs(ping_interval.into()));
175            loop {
176                ticker.tick().await;
177                let payload = Utc::now().to_rfc3339();
178                let event = SessionEvent::Ping {
179                    timestamp: crate::media::get_timestamp(),
180                    payload: Some(payload),
181                };
182                if let Err(_) = active_call.event_sender.send(event) {
183                    break;
184                }
185            }
186        };
187        let guard = ActiveCallGuard::new(active_call.clone());
188        info!(
189            session_id,
190            active_calls = guard.active_calls,
191            ?call_type,
192            "new call started"
193        );
194        let receiver = active_call.new_receiver();
195
196        let (r, _) = join! {
197            active_call.serve(receiver),
198            async {
199                select!{
200                    _ = send_ping_loop => {},
201                    _ = cancel_token.cancelled() => {},
202                    _ = send_to_ws_loop => { },
203                    _ = recv_from_ws_loop => {
204                        info!(session_id, "WebSocket closed by client");
205                    },
206                }
207                cancel_token.cancel();
208            }
209        };
210        match r {
211            Ok(_) => info!(session_id, "call ended successfully"),
212            Err(e) => warn!(session_id, "call ended with error: {}", e),
213        }
214
215        active_call.cleanup().await.ok();
216        // Drain remaining events
217        while let Ok(event) = event_receiver.try_recv() {
218            let message = match event.into_ws_message() {
219                Ok(msg) => msg,
220                Err(_) => continue,
221            };
222            if let Err(_) = ws_sender.send(message).await {
223                break;
224            }
225        }
226        ws_sender.flush().await.ok();
227        ws_sender.close().await.ok();
228        debug!(session_id, "WebSocket connection closed");
229    });
230    resp
231}
232
233pub(crate) async fn get_iceservers(State(state): State<AppState>) -> Response {
234    if let Some(ice_servers) = state.config.ice_servers.as_ref() {
235        return Json(ice_servers).into_response();
236    }
237    Json(vec![IceServer {
238        urls: vec!["stun:stun.l.google.com:19302".to_string()],
239        ..Default::default()
240    }])
241    .into_response()
242}
243
244pub(crate) async fn list_active_calls(State(state): State<AppState>) -> Response {
245    let calls = state
246        .active_calls
247        .lock()
248        .unwrap()
249        .iter()
250        .map(|(_, c)| {
251            let cs = c.call_state.read().unwrap();
252            json!({
253                "id": c.session_id,
254                "callType": c.call_type,
255                "cs.option": cs.option,
256                "ringTime": cs.ring_time,
257                "startTime": cs.answer_time,
258            })
259        })
260        .collect::<Vec<_>>();
261    Json(serde_json::json!({ "active_calls": calls })).into_response()
262}
263
264pub(crate) async fn kill_active_call(
265    Path(id): Path<String>,
266    State(state): State<AppState>,
267) -> Response {
268    let active_calls = state.active_calls.lock().unwrap();
269    if let Some(call) = active_calls.get(&id) {
270        call.cancel_token.cancel();
271        Json(serde_json::json!({ "status": "killed", "id": id })).into_response()
272    } else {
273        Json(serde_json::json!({ "status": "not_found", "id": id })).into_response()
274    }
275}
276
277trait IntoWsMessage {
278    fn into_ws_message(self) -> Result<Message, serde_json::Error>;
279}
280
281impl IntoWsMessage for crate::event::SessionEvent {
282    fn into_ws_message(self) -> Result<Message, serde_json::Error> {
283        match self {
284            SessionEvent::Binary { data, .. } => Ok(Message::Binary(data.into())),
285            SessionEvent::Ping { timestamp, payload } => {
286                let payload = payload.unwrap_or_else(|| timestamp.to_string());
287                Ok(Message::Ping(payload.into()))
288            }
289            event => serde_json::to_string(&event).map(|payload| Message::Text(payload.into())),
290        }
291    }
292}