active_call/handler/
handler.rs

1use crate::{
2    app::AppState,
3    call::{
4        ActiveCall, ActiveCallType, Command,
5        active_call::{ActiveCallGuard, CallParams},
6    },
7    handler::playbook,
8    playbook::{Playbook, PlaybookRunner},
9};
10use crate::{event::SessionEvent, media::track::TrackConfig};
11use axum::{
12    Json, Router,
13    extract::{Path, Query, State, WebSocketUpgrade, ws::Message},
14    response::{IntoResponse, Response},
15    routing::get,
16};
17use bytes::Bytes;
18use chrono::Utc;
19use futures::{SinkExt, StreamExt};
20use rustrtc::IceServer;
21use serde_json::json;
22use std::{path::PathBuf, sync::Arc, time::Duration};
23use tokio::{join, select};
24use tokio_util::sync::CancellationToken;
25use tracing::{debug, info, warn};
26use uuid::Uuid;
27
28pub fn call_router() -> Router<AppState> {
29    let r = Router::new()
30        .route("/call", get(ws_handler))
31        .route("/call/webrtc", get(webrtc_handler))
32        .route("/call/sip", get(sip_handler))
33        .route("/list", get(list_active_calls))
34        .route("/kill/{id}", get(kill_active_call));
35    r
36}
37
38pub fn iceservers_router() -> Router<AppState> {
39    let r = Router::new();
40    r.route("/iceservers", get(get_iceservers))
41}
42
43pub fn playbook_router() -> Router<AppState> {
44    Router::new()
45        .route("/api/playbooks", get(playbook::list_playbooks))
46        .route(
47            "/api/playbooks/{name}",
48            get(playbook::get_playbook).post(playbook::save_playbook),
49        )
50        .route(
51            "/api/playbook/run",
52            axum::routing::post(playbook::run_playbook),
53        )
54        .route("/api/records", get(playbook::list_records))
55}
56
57pub async fn ws_handler(
58    ws: WebSocketUpgrade,
59    State(state): State<AppState>,
60    Query(params): Query<CallParams>,
61) -> Response {
62    call_handler(ActiveCallType::WebSocket, ws, state, params).await
63}
64
65pub async fn sip_handler(
66    ws: WebSocketUpgrade,
67    State(state): State<AppState>,
68    Query(params): Query<CallParams>,
69) -> Response {
70    call_handler(ActiveCallType::Sip, ws, state, params).await
71}
72
73pub async fn webrtc_handler(
74    ws: WebSocketUpgrade,
75    State(state): State<AppState>,
76    Query(params): Query<CallParams>,
77) -> Response {
78    call_handler(ActiveCallType::Webrtc, ws, state, params).await
79}
80
81/// Core call handling logic that works with either WebSocket or mpsc channels
82pub async fn call_handler_core(
83    call_type: ActiveCallType,
84    session_id: String,
85    app_state: AppState,
86    cancel_token: CancellationToken,
87    audio_receiver: tokio::sync::mpsc::UnboundedReceiver<Bytes>,
88    server_side_track: Option<String>,
89    dump_events: bool,
90    ping_interval: u64,
91    mut command_receiver: tokio::sync::mpsc::UnboundedReceiver<Command>,
92    event_sender_to_client: tokio::sync::mpsc::UnboundedSender<crate::event::SessionEvent>,
93) {
94    let _cancel_guard = cancel_token.clone().drop_guard();
95    let track_config = TrackConfig::default();
96    let active_call = Arc::new(ActiveCall::new(
97        call_type.clone(),
98        cancel_token.clone(),
99        session_id.clone(),
100        app_state.invitation.clone(),
101        app_state.clone(),
102        track_config,
103        Some(audio_receiver),
104        dump_events,
105        server_side_track,
106        None, // No extra data for now
107    ));
108
109    // Check for pending playbook
110    {
111        let mut pending = app_state.pending_playbooks.lock().await;
112        if let Some(name_or_content) = pending.remove(&session_id) {
113            let variables = active_call.call_state.read().await.extras.clone();
114            let playbook_result = if name_or_content.trim().starts_with("---") {
115                Playbook::parse(&name_or_content, variables.as_ref())
116            } else {
117                // If path already contains config/playbook, use it as-is; otherwise prepend it
118                let path = if name_or_content.starts_with("config/playbook/") {
119                    PathBuf::from(&name_or_content)
120                } else {
121                    PathBuf::from("config/playbook").join(&name_or_content)
122                };
123                Playbook::load(path, variables.as_ref()).await
124            };
125
126            match playbook_result {
127                Ok(playbook) => match PlaybookRunner::new(playbook, active_call.clone()) {
128                    Ok(runner) => {
129                        crate::spawn(async move {
130                            runner.run().await;
131                        });
132                        let display_name = if name_or_content.trim().starts_with("---") {
133                            "custom content"
134                        } else {
135                            &name_or_content
136                        };
137                        info!(session_id, "Playbook runner started for {}", display_name);
138                    }
139                    Err(e) => {
140                        let display_name = if name_or_content.trim().starts_with("---") {
141                            "custom content"
142                        } else {
143                            &name_or_content
144                        };
145                        warn!(
146                            session_id,
147                            "Failed to create runner {}: {}", display_name, e
148                        )
149                    }
150                },
151                Err(e) => {
152                    let display_name = if name_or_content.trim().starts_with("---") {
153                        "custom content"
154                    } else {
155                        &name_or_content
156                    };
157                    warn!(
158                        session_id,
159                        "Failed to load playbook {}: {}", display_name, e
160                    );
161                    let event = SessionEvent::Error {
162                        timestamp: crate::media::get_timestamp(),
163                        track_id: session_id.clone(),
164                        sender: "playbook".to_string(),
165                        error: format!("{}", e),
166                        code: None,
167                    };
168                    event_sender_to_client.send(event).ok();
169                    return;
170                }
171            }
172        }
173    }
174
175    let recv_commands_loop = async {
176        while let Some(command) = command_receiver.recv().await {
177            if let Err(_) = active_call.enqueue_command(command).await {
178                break;
179            }
180        }
181    };
182
183    let mut event_receiver = active_call.event_sender.subscribe();
184    let send_events_loop = async {
185        loop {
186            match event_receiver.recv().await {
187                Ok(event) => {
188                    if let Err(_) = event_sender_to_client.send(event) {
189                        break;
190                    }
191                }
192                Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
193                Err(_) => break,
194            }
195        }
196    };
197
198    let send_ping_loop = async {
199        if ping_interval == 0 {
200            active_call.cancel_token.cancelled().await;
201            return;
202        }
203        let mut ticker = tokio::time::interval(Duration::from_secs(ping_interval));
204        loop {
205            ticker.tick().await;
206            let payload = Utc::now().to_rfc3339();
207            let event = SessionEvent::Ping {
208                timestamp: crate::media::get_timestamp(),
209                payload: Some(payload),
210            };
211            if let Err(_) = active_call.event_sender.send(event) {
212                break;
213            }
214        }
215    };
216
217    let guard = ActiveCallGuard::new(active_call.clone());
218    info!(
219        session_id,
220        active_calls = guard.active_calls,
221        ?call_type,
222        "new call started"
223    );
224    let receiver = active_call.new_receiver();
225
226    let (r, _) = join! {
227        active_call.serve(receiver),
228        async {
229            select!{
230                _ = send_ping_loop => {},
231                _ = cancel_token.cancelled() => {},
232                _ = send_events_loop => { },
233                _ = recv_commands_loop => {
234                    info!(session_id, "Command receiver closed");
235                },
236            }
237            cancel_token.cancel();
238        }
239    };
240    match r {
241        Ok(_) => info!(session_id, "call ended successfully"),
242        Err(e) => warn!(session_id, "call ended with error: {}", e),
243    }
244
245    active_call.cleanup().await.ok();
246    debug!(session_id, "Call handler core completed");
247}
248
249pub async fn call_handler(
250    call_type: ActiveCallType,
251    ws: WebSocketUpgrade,
252    app_state: AppState,
253    params: CallParams,
254) -> Response {
255    let session_id = params
256        .id
257        .unwrap_or_else(|| format!("s.{}", Uuid::new_v4().to_string()));
258    let server_side_track = params.server_side_track.clone();
259    let dump_events = params.dump_events.unwrap_or(true);
260    let ping_interval = params.ping_interval.unwrap_or(20);
261
262    let resp = ws.on_upgrade(move |socket| async move {
263        let (mut ws_sender, mut ws_receiver) = socket.split();
264        let (audio_sender, audio_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
265        let (command_sender, command_receiver) = tokio::sync::mpsc::unbounded_channel::<Command>();
266        let (event_sender_to_client, mut event_receiver_from_core) =
267            tokio::sync::mpsc::unbounded_channel::<crate::event::SessionEvent>();
268        let cancel_token = CancellationToken::new();
269
270        // Start core handler in background
271        let session_id_clone = session_id.clone();
272        let app_state_clone = app_state.clone();
273        let cancel_token_clone = cancel_token.clone();
274        crate::spawn(call_handler_core(
275            call_type,
276            session_id_clone,
277            app_state_clone,
278            cancel_token_clone,
279            audio_receiver,
280            server_side_track,
281            dump_events,
282            ping_interval.into(),
283            command_receiver,
284            event_sender_to_client,
285        ));
286
287        // Handle WebSocket I/O
288        let recv_from_ws_loop = async {
289            while let Some(Ok(message)) = ws_receiver.next().await {
290                match message {
291                    Message::Text(text) => {
292                        let command = match serde_json::from_str::<Command>(&text) {
293                            Ok(cmd) => cmd,
294                            Err(e) => {
295                                warn!(session_id, %text, "Failed to parse command {}", e);
296                                continue;
297                            }
298                        };
299                        if let Err(_) = command_sender.send(command) {
300                            break;
301                        }
302                    }
303                    Message::Binary(bin) => {
304                        audio_sender.send(bin.into()).ok();
305                    }
306                    Message::Close(_) => {
307                        info!(session_id, "WebSocket closed by client");
308                        break;
309                    }
310                    _ => {}
311                }
312            }
313        };
314
315        let send_to_ws_loop = async {
316            while let Some(event) = event_receiver_from_core.recv().await {
317                debug!(session_id, %event, "Sending WS message");
318                let message = match event.into_ws_message() {
319                    Ok(msg) => msg,
320                    Err(e) => {
321                        warn!(session_id, error=%e, "Failed to serialize event to WS message");
322                        continue;
323                    }
324                };
325                if let Err(_) = ws_sender.send(message).await {
326                    info!(session_id, "WebSocket send failed, closing");
327                    break;
328                }
329            }
330        };
331
332        select! {
333            _ = recv_from_ws_loop => {
334                info!(session_id, "WebSocket receive loop ended");
335            },
336            _ = send_to_ws_loop => {
337                info!(session_id, "WebSocket send loop ended");
338            },
339            _ = cancel_token.cancelled() => {
340                info!(session_id, "WebSocket cancelled");
341            },
342        }
343
344        cancel_token.cancel();
345        ws_sender.flush().await.ok();
346        ws_sender.close().await.ok();
347        debug!(session_id, "WebSocket connection closed");
348    });
349    resp
350}
351
352pub(crate) async fn get_iceservers(State(state): State<AppState>) -> Response {
353    if let Some(ice_servers) = state.config.ice_servers.as_ref() {
354        return Json(ice_servers).into_response();
355    }
356    Json(vec![IceServer {
357        urls: vec!["stun:stun.l.google.com:19302".to_string()],
358        ..Default::default()
359    }])
360    .into_response()
361}
362
363pub(crate) async fn list_active_calls(State(state): State<AppState>) -> Response {
364    let calls = state
365        .active_calls
366        .lock()
367        .unwrap()
368        .iter()
369        .map(|(_, c)| {
370            if let Ok(cs) = c.call_state.try_read() {
371                json!({
372                    "id": c.session_id,
373                    "callType": c.call_type,
374                    "cs.option": cs.option,
375                    "ringTime": cs.ring_time,
376                    "startTime": cs.answer_time,
377                })
378            } else {
379                json!({
380                    "id": c.session_id,
381                    "callType": c.call_type,
382                    "status": "locked",
383                })
384            }
385        })
386        .collect::<Vec<_>>();
387    Json(serde_json::json!({ "active_calls": calls })).into_response()
388}
389
390pub(crate) async fn kill_active_call(
391    Path(id): Path<String>,
392    State(state): State<AppState>,
393) -> Response {
394    let active_calls = state.active_calls.lock().unwrap();
395    if let Some(call) = active_calls.get(&id) {
396        call.cancel_token.cancel();
397        Json(serde_json::json!({ "status": "killed", "id": id })).into_response()
398    } else {
399        Json(serde_json::json!({ "status": "not_found", "id": id })).into_response()
400    }
401}
402
403trait IntoWsMessage {
404    fn into_ws_message(self) -> Result<Message, serde_json::Error>;
405}
406
407impl IntoWsMessage for crate::event::SessionEvent {
408    fn into_ws_message(self) -> Result<Message, serde_json::Error> {
409        match self {
410            SessionEvent::Binary { data, .. } => Ok(Message::Binary(data.into())),
411            SessionEvent::Ping { timestamp, payload } => {
412                let payload = payload.unwrap_or_else(|| timestamp.to_string());
413                Ok(Message::Ping(payload.into()))
414            }
415            event => serde_json::to_string(&event).map(|payload| Message::Text(payload.into())),
416        }
417    }
418}