active_call/handler/
handler.rs

1use crate::{
2    app::AppState,
3    call::{
4        ActiveCall, ActiveCallType, Command,
5        active_call::{ActiveCallGuard, CallParams},
6    },
7    handler::api,
8    playbook::{Playbook, PlaybookRunner},
9};
10use axum::{
11    Router,
12    extract::{Query, State, WebSocketUpgrade, ws::Message},
13    response::Response,
14    routing::get,
15};
16use bytes::Bytes;
17use chrono::Utc;
18use futures::{SinkExt, StreamExt};
19use std::{path::PathBuf, sync::Arc, time::Duration};
20use tokio::{join, select};
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, info, warn};
23use uuid::Uuid;
24use voice_engine::{event::SessionEvent, media::track::TrackConfig};
25
26pub fn call_router() -> Router<AppState> {
27    Router::new()
28        .route("/call", get(ws_handler))
29        .route("/call/webrtc", get(webrtc_handler))
30        .route("/call/sip", get(sip_handler))
31}
32
33pub fn playbook_router() -> Router<AppState> {
34    Router::new()
35        .route("/api/playbooks", get(api::list_playbooks))
36        .route(
37            "/api/playbooks/{name}",
38            get(api::get_playbook).post(api::save_playbook),
39        )
40        .route("/api/playbook/run", axum::routing::post(api::run_playbook))
41        .route("/api/records", get(api::list_records))
42}
43
44pub async fn ws_handler(
45    ws: WebSocketUpgrade,
46    State(state): State<AppState>,
47    Query(params): Query<CallParams>,
48) -> Response {
49    call_handler(ActiveCallType::WebSocket, ws, state, params).await
50}
51
52pub async fn sip_handler(
53    ws: WebSocketUpgrade,
54    State(state): State<AppState>,
55    Query(params): Query<CallParams>,
56) -> Response {
57    call_handler(ActiveCallType::Sip, ws, state, params).await
58}
59
60pub async fn webrtc_handler(
61    ws: WebSocketUpgrade,
62    State(state): State<AppState>,
63    Query(params): Query<CallParams>,
64) -> Response {
65    call_handler(ActiveCallType::Webrtc, ws, state, params).await
66}
67
68pub async fn call_handler(
69    call_type: ActiveCallType,
70    ws: WebSocketUpgrade,
71    app_state: AppState,
72    params: CallParams,
73) -> Response {
74    let session_id = params
75        .id
76        .unwrap_or_else(|| format!("s.{}", Uuid::new_v4().to_string()));
77    let server_side_track = params.server_side_track.clone();
78    let dump_events = params.dump_events.unwrap_or(true);
79    let ping_interval = params.ping_interval.unwrap_or(20);
80
81    let resp = ws.on_upgrade(move |socket| async move {
82        let (mut ws_sender, mut ws_receiver) = socket.split();
83        let (audio_sender, audio_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
84        let cancel_token = CancellationToken::new();
85        let track_config = TrackConfig::default();
86        let active_call = Arc::new(ActiveCall::new(
87            call_type.clone(),
88            cancel_token.clone(),
89            session_id.clone(),
90            app_state.invitation.clone(),
91            app_state.clone(),
92            track_config,
93            Some(audio_receiver),
94            dump_events,
95            server_side_track,
96            None, // No extra data for now
97        ));
98
99        // Check for pending playbook
100        {
101            let mut pending = app_state.pending_playbooks.lock().await;
102            if let Some(name) = pending.remove(&session_id) {
103                let path = PathBuf::from("config/playbook").join(&name);
104                match Playbook::load(path).await {
105                    Ok(playbook) => match PlaybookRunner::new(playbook, active_call.clone()) {
106                        Ok(runner) => {
107                            tokio::spawn(async move {
108                                runner.run().await;
109                            });
110                            info!(session_id, "Playbook runner started for {}", name);
111                        }
112                        Err(e) => warn!(session_id, "Failed to create runner {}: {}", name, e),
113                    },
114                    Err(e) => {
115                        warn!(session_id, "Failed to load playbook {}: {}", name, e);
116                    }
117                }
118            }
119        }
120
121        let recv_from_ws_loop = async {
122            while let Some(Ok(message)) = ws_receiver.next().await {
123                match message {
124                    Message::Text(text) => {
125                        let command = match serde_json::from_str::<Command>(&text) {
126                            Ok(cmd) => cmd,
127                            Err(e) => {
128                                warn!(session_id, %text, "Failed to parse command {}",e);
129                                continue;
130                            }
131                        };
132                        if let Err(_) = active_call.enqueue_command(command).await {
133                            break;
134                        }
135                    }
136                    Message::Binary(bin) => {
137                        audio_sender.send(bin.into()).ok();
138                    }
139                    Message::Close(_) => {
140                        info!(session_id, "WebSocket closed by client");
141                        break;
142                    }
143                    _ => {}
144                }
145            }
146        };
147
148        let mut event_receiver = active_call.event_sender.subscribe();
149        let send_to_ws_loop = async {
150            while let Ok(event) = event_receiver.recv().await {
151                let message = match event.into_ws_message() {
152                    Ok(msg) => msg,
153                    Err(_) => continue,
154                };
155                if let Err(_) = ws_sender.send(message).await {
156                    break;
157                }
158            }
159        };
160
161        let send_ping_loop = async {
162            if ping_interval == 0 {
163                active_call.cancel_token.cancelled().await;
164                return;
165            }
166            let mut ticker = tokio::time::interval(Duration::from_secs(ping_interval.into()));
167            loop {
168                ticker.tick().await;
169                let payload = Utc::now().to_rfc3339();
170                let event = SessionEvent::Ping {
171                    timestamp: voice_engine::media::get_timestamp(),
172                    payload: Some(payload),
173                };
174                if let Err(_) = active_call.event_sender.send(event) {
175                    break;
176                }
177            }
178        };
179        let guard = ActiveCallGuard::new(active_call.clone());
180        info!(
181            session_id,
182            active_calls = guard.active_calls,
183            ?call_type,
184            "new call started"
185        );
186        let receiver = active_call.new_receiver();
187
188        let (r, _) = join! {
189            active_call.serve(receiver),
190            async {
191                select!{
192                    _ = send_ping_loop => {},
193                    _ = cancel_token.cancelled() => {},
194                    _ = send_to_ws_loop => { },
195                    _ = recv_from_ws_loop => {
196                        info!(session_id, "WebSocket closed by client");
197                    },
198                }
199                cancel_token.cancel();
200            }
201        };
202        match r {
203            Ok(_) => info!(session_id, "call ended successfully"),
204            Err(e) => warn!(session_id, "call ended with error: {}", e),
205        }
206
207        active_call.cleanup().await.ok();
208        // Drain remaining events
209        while let Ok(event) = event_receiver.try_recv() {
210            let message = match event.into_ws_message() {
211                Ok(msg) => msg,
212                Err(_) => continue,
213            };
214            if let Err(_) = ws_sender.send(message).await {
215                break;
216            }
217        }
218        ws_sender.flush().await.ok();
219        ws_sender.close().await.ok();
220        debug!(session_id, "WebSocket connection closed");
221    });
222    resp
223}
224
225trait IntoWsMessage {
226    fn into_ws_message(self) -> Result<Message, serde_json::Error>;
227}
228
229impl IntoWsMessage for voice_engine::event::SessionEvent {
230    fn into_ws_message(self) -> Result<Message, serde_json::Error> {
231        match self {
232            SessionEvent::Binary { data, .. } => Ok(Message::Binary(data.into())),
233            SessionEvent::Ping { timestamp, payload } => {
234                let payload = payload.unwrap_or_else(|| timestamp.to_string());
235                Ok(Message::Ping(payload.into()))
236            }
237            event => serde_json::to_string(&event).map(|payload| Message::Text(payload.into())),
238        }
239    }
240}