1use crate::{
2 app::AppState,
3 call::{
4 ActiveCall, ActiveCallType, Command,
5 active_call::{ActiveCallGuard, CallParams},
6 },
7 handler::api,
8 playbook::{Playbook, PlaybookRunner},
9};
10use axum::{
11 Router,
12 extract::{Query, State, WebSocketUpgrade, ws::Message},
13 response::Response,
14 routing::get,
15};
16use bytes::Bytes;
17use chrono::Utc;
18use futures::{SinkExt, StreamExt};
19use std::{path::PathBuf, sync::Arc, time::Duration};
20use tokio::{join, select};
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, info, warn};
23use uuid::Uuid;
24use voice_engine::{event::SessionEvent, media::track::TrackConfig};
25
26pub fn call_router() -> Router<AppState> {
27 Router::new()
28 .route("/call", get(ws_handler))
29 .route("/call/webrtc", get(webrtc_handler))
30 .route("/call/sip", get(sip_handler))
31}
32
33pub fn playbook_router() -> Router<AppState> {
34 Router::new()
35 .route("/api/playbooks", get(api::list_playbooks))
36 .route(
37 "/api/playbooks/{name}",
38 get(api::get_playbook).post(api::save_playbook),
39 )
40 .route("/api/playbook/run", axum::routing::post(api::run_playbook))
41 .route("/api/records", get(api::list_records))
42}
43
44pub async fn ws_handler(
45 ws: WebSocketUpgrade,
46 State(state): State<AppState>,
47 Query(params): Query<CallParams>,
48) -> Response {
49 call_handler(ActiveCallType::WebSocket, ws, state, params).await
50}
51
52pub async fn sip_handler(
53 ws: WebSocketUpgrade,
54 State(state): State<AppState>,
55 Query(params): Query<CallParams>,
56) -> Response {
57 call_handler(ActiveCallType::Sip, ws, state, params).await
58}
59
60pub async fn webrtc_handler(
61 ws: WebSocketUpgrade,
62 State(state): State<AppState>,
63 Query(params): Query<CallParams>,
64) -> Response {
65 call_handler(ActiveCallType::Webrtc, ws, state, params).await
66}
67
68pub async fn call_handler(
69 call_type: ActiveCallType,
70 ws: WebSocketUpgrade,
71 app_state: AppState,
72 params: CallParams,
73) -> Response {
74 let session_id = params
75 .id
76 .unwrap_or_else(|| format!("s.{}", Uuid::new_v4().to_string()));
77 let server_side_track = params.server_side_track.clone();
78 let dump_events = params.dump_events.unwrap_or(true);
79 let ping_interval = params.ping_interval.unwrap_or(20);
80
81 let resp = ws.on_upgrade(move |socket| async move {
82 let (mut ws_sender, mut ws_receiver) = socket.split();
83 let (audio_sender, audio_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();
84 let cancel_token = CancellationToken::new();
85 let track_config = TrackConfig::default();
86 let active_call = Arc::new(ActiveCall::new(
87 call_type.clone(),
88 cancel_token.clone(),
89 session_id.clone(),
90 app_state.invitation.clone(),
91 app_state.clone(),
92 track_config,
93 Some(audio_receiver),
94 dump_events,
95 server_side_track,
96 None, ));
98
99 {
101 let mut pending = app_state.pending_playbooks.lock().await;
102 if let Some(name) = pending.remove(&session_id) {
103 let path = PathBuf::from("config/playbook").join(&name);
104 match Playbook::load(path).await {
105 Ok(playbook) => match PlaybookRunner::new(playbook, active_call.clone()) {
106 Ok(runner) => {
107 tokio::spawn(async move {
108 runner.run().await;
109 });
110 info!(session_id, "Playbook runner started for {}", name);
111 }
112 Err(e) => warn!(session_id, "Failed to create runner {}: {}", name, e),
113 },
114 Err(e) => {
115 warn!(session_id, "Failed to load playbook {}: {}", name, e);
116 }
117 }
118 }
119 }
120
121 let recv_from_ws_loop = async {
122 while let Some(Ok(message)) = ws_receiver.next().await {
123 match message {
124 Message::Text(text) => {
125 let command = match serde_json::from_str::<Command>(&text) {
126 Ok(cmd) => cmd,
127 Err(e) => {
128 warn!(session_id, %text, "Failed to parse command {}",e);
129 continue;
130 }
131 };
132 if let Err(_) = active_call.enqueue_command(command).await {
133 break;
134 }
135 }
136 Message::Binary(bin) => {
137 audio_sender.send(bin.into()).ok();
138 }
139 Message::Close(_) => {
140 info!(session_id, "WebSocket closed by client");
141 break;
142 }
143 _ => {}
144 }
145 }
146 };
147
148 let mut event_receiver = active_call.event_sender.subscribe();
149 let send_to_ws_loop = async {
150 while let Ok(event) = event_receiver.recv().await {
151 let message = match event.into_ws_message() {
152 Ok(msg) => msg,
153 Err(_) => continue,
154 };
155 if let Err(_) = ws_sender.send(message).await {
156 break;
157 }
158 }
159 };
160
161 let send_ping_loop = async {
162 if ping_interval == 0 {
163 active_call.cancel_token.cancelled().await;
164 return;
165 }
166 let mut ticker = tokio::time::interval(Duration::from_secs(ping_interval.into()));
167 loop {
168 ticker.tick().await;
169 let payload = Utc::now().to_rfc3339();
170 let event = SessionEvent::Ping {
171 timestamp: voice_engine::media::get_timestamp(),
172 payload: Some(payload),
173 };
174 if let Err(_) = active_call.event_sender.send(event) {
175 break;
176 }
177 }
178 };
179 let guard = ActiveCallGuard::new(active_call.clone());
180 info!(
181 session_id,
182 active_calls = guard.active_calls,
183 ?call_type,
184 "new call started"
185 );
186 let receiver = active_call.new_receiver();
187
188 let (r, _) = join! {
189 active_call.serve(receiver),
190 async {
191 select!{
192 _ = send_ping_loop => {},
193 _ = cancel_token.cancelled() => {},
194 _ = send_to_ws_loop => { },
195 _ = recv_from_ws_loop => {
196 info!(session_id, "WebSocket closed by client");
197 },
198 }
199 cancel_token.cancel();
200 }
201 };
202 match r {
203 Ok(_) => info!(session_id, "call ended successfully"),
204 Err(e) => warn!(session_id, "call ended with error: {}", e),
205 }
206
207 active_call.cleanup().await.ok();
208 while let Ok(event) = event_receiver.try_recv() {
210 let message = match event.into_ws_message() {
211 Ok(msg) => msg,
212 Err(_) => continue,
213 };
214 if let Err(_) = ws_sender.send(message).await {
215 break;
216 }
217 }
218 ws_sender.flush().await.ok();
219 ws_sender.close().await.ok();
220 debug!(session_id, "WebSocket connection closed");
221 });
222 resp
223}
224
225trait IntoWsMessage {
226 fn into_ws_message(self) -> Result<Message, serde_json::Error>;
227}
228
229impl IntoWsMessage for voice_engine::event::SessionEvent {
230 fn into_ws_message(self) -> Result<Message, serde_json::Error> {
231 match self {
232 SessionEvent::Binary { data, .. } => Ok(Message::Binary(data.into())),
233 SessionEvent::Ping { timestamp, payload } => {
234 let payload = payload.unwrap_or_else(|| timestamp.to_string());
235 Ok(Message::Ping(payload.into()))
236 }
237 event => serde_json::to_string(&event).map(|payload| Message::Text(payload.into())),
238 }
239 }
240}