1use crate::{
2 call::{ActiveCallRef, sip::Invitation},
3 callrecord::{
4 CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5 },
6 config::Config,
7 useragent::{
8 RegisterOption,
9 invitation::FnCreateInvitationHandler,
10 invitation::{PendingDialog, PendingDialogGuard, default_create_invite_handler},
11 registration::RegistrationHandle,
12 },
13};
14
15use anyhow::Result;
16use chrono::{DateTime, Utc};
17use humantime::parse_duration;
18use rsip::prelude::HeadersExt;
19use rsipstack::dialog::dialog_layer::DialogLayer;
20use rsipstack::transaction::{Endpoint, TransactionReceiver};
21use std::collections::HashSet;
22use std::str::FromStr;
23use std::sync::{Arc, RwLock};
24use std::time::Duration;
25use std::{collections::HashMap, net::SocketAddr};
26use std::{path::Path, sync::atomic::AtomicU64};
27use tokio::select;
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{info, warn};
31use voice_engine::media::{cache::set_cache_dir, engine::StreamEngine};
32
33pub struct AppStateInner {
34 pub config: Arc<Config>,
35 pub token: CancellationToken,
36 pub stream_engine: Arc<StreamEngine>,
37 pub callrecord_sender: Option<CallRecordSender>,
38 pub endpoint: Endpoint,
39 pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
40 pub alive_users: Arc<RwLock<HashSet<String>>>,
41 pub dialog_layer: Arc<DialogLayer>,
42 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
43 pub invitation: Invitation,
44 pub routing_state: Arc<crate::call::RoutingState>,
45 pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
46
47 pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
48 pub total_calls: AtomicU64,
49 pub total_failed_calls: AtomicU64,
50}
51
52pub type AppState = Arc<AppStateInner>;
53
54pub struct AppStateBuilder {
55 pub config: Option<Config>,
56 pub stream_engine: Option<Arc<StreamEngine>>,
57 pub callrecord_sender: Option<CallRecordSender>,
58 pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
59 pub cancel_token: Option<CancellationToken>,
60 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
61 pub config_loaded_at: Option<DateTime<Utc>>,
62 pub config_path: Option<String>,
63}
64
65impl AppStateInner {
66 pub fn get_dump_events_file(&self, session_id: &String) -> String {
67 let recorder_root = self.config.recorder_path();
68 let root = Path::new(&recorder_root);
69 if !root.exists() {
70 match std::fs::create_dir_all(root) {
71 Ok(_) => {
72 info!("created dump events root: {}", root.to_string_lossy());
73 }
74 Err(e) => {
75 warn!(
76 "Failed to create dump events root: {} {}",
77 e,
78 root.to_string_lossy()
79 );
80 }
81 }
82 }
83 root.join(format!("{}.events.jsonl", session_id))
84 .to_string_lossy()
85 .to_string()
86 }
87
88 pub fn get_recorder_file(&self, session_id: &String) -> String {
89 let recorder_root = self.config.recorder_path();
90 let root = Path::new(&recorder_root);
91 if !root.exists() {
92 match std::fs::create_dir_all(root) {
93 Ok(_) => {
94 info!("created recorder root: {}", root.to_string_lossy());
95 }
96 Err(e) => {
97 warn!(
98 "Failed to create recorder root: {} {}",
99 e,
100 root.to_string_lossy()
101 );
102 }
103 }
104 }
105 let mut recorder_file = root.join(session_id);
106 let desired_ext = self.config.recorder_format().extension();
107 let has_desired_ext = recorder_file
108 .extension()
109 .and_then(|ext| ext.to_str())
110 .map(|ext| ext.eq_ignore_ascii_case(desired_ext))
111 .unwrap_or(false);
112
113 if !has_desired_ext {
114 recorder_file.set_extension(desired_ext);
116 }
117
118 recorder_file.to_string_lossy().to_string()
119 }
120
121 pub async fn serve(self: Arc<Self>) -> Result<()> {
122 let incoming_txs = self.endpoint.incoming_transactions()?;
123 let token = self.token.child_token();
124 let endpoint_inner = self.endpoint.inner.clone();
125 let dialog_layer = self.dialog_layer.clone();
126
127 match self.start_registration().await {
128 Ok(count) => {
129 info!("registration started, count: {}", count);
130 }
131 Err(e) => {
132 warn!("failed to start registration: {:?}", e);
133 }
134 }
135
136 tokio::select! {
137 _ = token.cancelled() => {
138 info!("cancelled");
139 }
140 result = endpoint_inner.serve() => {
141 if let Err(e) = result {
142 info!("endpoint serve error: {:?}", e);
143 }
144 }
145 result = self.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
146 if let Err(e) = result {
147 info!("process incoming request error: {:?}", e);
148 }
149 },
150 }
151
152 let timeout = self
155 .config
156 .graceful_shutdown
157 .map(|_| Duration::from_secs(50));
158
159 match self.stop_registration(timeout).await {
160 Ok(_) => {
161 info!("registration stopped, waiting for clear");
162 }
163 Err(e) => {
164 warn!("failed to stop registration: {:?}", e);
165 }
166 }
167 info!("stopping");
168 Ok(())
169 }
170
171 async fn process_incoming_request(
172 &self,
173 dialog_layer: Arc<DialogLayer>,
174 mut incoming: TransactionReceiver,
175 ) -> Result<()> {
176 while let Some(mut tx) = incoming.recv().await {
177 let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
178 info!(?key, "received transaction");
179 if tx.original.to_header()?.tag()?.as_ref().is_some() {
180 match dialog_layer.match_dialog(&tx.original) {
181 Some(mut d) => {
182 tokio::spawn(async move {
183 match d.handle(&mut tx).await {
184 Ok(_) => (),
185 Err(e) => {
186 info!("error handling transaction: {:?}", e);
187 }
188 }
189 });
190 continue;
191 }
192 None => {
193 info!("dialog not found: {}", tx.original);
194 match tx
195 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
196 .await
197 {
198 Ok(_) => (),
199 Err(e) => {
200 info!("error replying to request: {:?}", e);
201 }
202 }
203 continue;
204 }
205 }
206 }
207 let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
209 match tx.original.method {
210 rsip::Method::Invite | rsip::Method::Ack => {
211 let invitation_handler = match self.create_invitation_handler {
212 Some(ref create_invitation_handler) => {
213 create_invitation_handler(self.config.sip_handler.as_ref()).ok()
214 }
215 _ => default_create_invite_handler(self.config.sip_handler.as_ref()),
216 };
217 let invitation_handler = match invitation_handler {
218 Some(h) => h,
219 None => {
220 info!(?key, "no invite handler configured, rejecting INVITE");
221 match tx
222 .reply_with(
223 rsip::StatusCode::ServiceUnavailable,
224 vec![rsip::Header::Other(
225 "Reason".into(),
226 "SIP;cause=503;text=\"No invite handler configured\""
227 .into(),
228 )],
229 None,
230 )
231 .await
232 {
233 Ok(_) => (),
234 Err(e) => {
235 info!("error replying to request: {:?}", e);
236 }
237 }
238 continue;
239 }
240 };
241 let contact = dialog_layer
242 .endpoint
243 .get_addrs()
244 .first()
245 .map(|addr| rsip::Uri {
246 scheme: Some(rsip::Scheme::Sip),
247 auth: None,
248 host_with_port: addr.addr.clone(),
249 params: vec![],
250 headers: vec![],
251 });
252
253 let dialog = match dialog_layer.get_or_create_server_invite(
254 &tx,
255 state_sender,
256 None,
257 contact,
258 ) {
259 Ok(d) => d,
260 Err(e) => {
261 info!("failed to obtain dialog: {:?}", e);
263 match tx
264 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
265 .await
266 {
267 Ok(_) => (),
268 Err(e) => {
269 info!("error replying to request: {:?}", e);
270 }
271 }
272 continue;
273 }
274 };
275
276 let dialog_id_str = dialog.id().to_string();
277 let token = self.token.child_token();
278 let pending_dialog = PendingDialog {
279 token: token.clone(),
280 dialog: dialog.clone(),
281 state_receiver,
282 };
283
284 let guard = Arc::new(PendingDialogGuard::new(
285 self.invitation.clone(),
286 dialog_id_str.clone(),
287 pending_dialog,
288 ));
289
290 let accept_timeout = self
291 .config
292 .sip_accept_timeout
293 .as_ref()
294 .and_then(|t| parse_duration(t).ok())
295 .unwrap_or_else(|| Duration::from_secs(60));
296
297 let token_ref = token.clone();
298 let guard_ref = guard.clone();
299 tokio::spawn(async move {
300 select! {
301 _ = token_ref.cancelled() => {}
302 _ = tokio::time::sleep(accept_timeout) => {}
303 }
304 guard_ref.drop_async().await;
305 });
306
307 let mut dialog_ref = dialog.clone();
308 let token_ref = token.clone();
309 let routing_state = self.routing_state.clone();
310 tokio::spawn(async move {
311 let invite_loop = async {
312 match invitation_handler
313 .on_invite(
314 dialog_id_str.clone(),
315 token,
316 dialog.clone(),
317 routing_state,
318 )
319 .await
320 {
321 Ok(_) => (),
322 Err(e) => {
323 info!(id = dialog_id_str, "error handling invite: {:?}", e);
324 guard.drop_async().await;
325 }
326 }
327 };
328 select! {
329 _ = token_ref.cancelled() => {}
330 _ = async {
331 let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
332 } => {}
333 }
334 });
335 }
336 rsip::Method::Options => {
337 info!(?key, "ignoring out-of-dialog OPTIONS request");
338 continue;
339 }
340 _ => {
341 info!(?key, "received request: {:?}", tx.original.method);
342 match tx.reply(rsip::StatusCode::OK).await {
343 Ok(_) => (),
344 Err(e) => {
345 info!("error replying to request: {:?}", e);
346 }
347 }
348 }
349 }
350 }
351 Ok(())
352 }
353
354 pub fn stop(&self) {
355 info!("stopping");
356 self.token.cancel();
357 }
358
359 pub async fn start_registration(&self) -> Result<usize> {
360 let mut count = 0;
361 if let Some(register_users) = &self.config.register_users {
362 for option in register_users.iter() {
363 match self.register(option.clone()).await {
364 Ok(_) => {
365 count += 1;
366 }
367 Err(e) => {
368 warn!("failed to register user: {:?} {:?}", e, option);
369 }
370 }
371 }
372 }
373 Ok(count)
374 }
375
376 pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
377 for (_, handle) in self.registration_handles.lock().await.iter_mut() {
378 handle.stop();
379 }
380
381 if let Some(duration) = wait_for_clear {
382 let live_users = self.alive_users.clone();
383 let check_loop = async move {
384 loop {
385 let is_empty = {
386 let users = live_users
387 .read()
388 .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
389 users.is_empty()
390 };
391 if is_empty {
392 break;
393 }
394 tokio::time::sleep(Duration::from_millis(50)).await;
395 }
396 Ok::<(), anyhow::Error>(())
397 };
398 match tokio::time::timeout(duration, check_loop).await {
399 Ok(_) => {}
400 Err(e) => {
401 warn!("failed to wait for clear: {}", e);
402 return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
403 }
404 }
405 }
406 Ok(())
407 }
408
409 pub async fn register(&self, option: RegisterOption) -> Result<()> {
410 let user = option.aor();
411 let mut server = option.server.clone();
412 if !server.starts_with("sip:") && !server.starts_with("sips:") {
413 server = format!("sip:{}", server);
414 }
415 let sip_server = match rsip::Uri::try_from(server) {
416 Ok(uri) => uri,
417 Err(e) => {
418 warn!("failed to parse server: {} {:?}", e, option.server);
419 return Err(anyhow::anyhow!("failed to parse server: {}", e));
420 }
421 };
422 let cancel_token = self.token.child_token();
423 let handle = RegistrationHandle {
424 inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
425 endpoint_inner: self.endpoint.inner.clone(),
426 option,
427 cancel_token,
428 start_time: Mutex::new(std::time::Instant::now()),
429 last_update: Mutex::new(std::time::Instant::now()),
430 last_response: Mutex::new(None),
431 }),
432 };
433 self.registration_handles
434 .lock()
435 .await
436 .insert(user.clone(), handle.clone());
437 let alive_users = self.alive_users.clone();
438
439 tokio::spawn(async move {
440 *handle.inner.start_time.lock().await = std::time::Instant::now();
441
442 select! {
443 _ = handle.inner.cancel_token.cancelled() => {
444 }
445 _ = async {
446 loop {
447 let user = handle.inner.option.aor();
448 alive_users.write().unwrap().remove(&user);
449 let refresh_time = match handle.do_register(&sip_server, None).await {
450 Ok(expires) => {
451 info!(
452 user = handle.inner.option.aor(),
453 expires = expires,
454 alive_users = alive_users.read().unwrap().len(),
455 "registration refreshed",
456 );
457 alive_users.write().unwrap().insert(user);
458 expires * 3 / 4 }
460 Err(e) => {
461 warn!(
462 user = handle.inner.option.aor(),
463 alive_users = alive_users.read().unwrap().len(),
464 "registration failed: {:?}", e);
465 60
466 }
467 };
468 tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
469 }
470 } => {}
471 }
472 handle.do_register(&sip_server, Some(0)).await.ok();
473 alive_users.write().unwrap().remove(&user);
474 });
475 Ok(())
476 }
477}
478
479impl Drop for AppStateInner {
480 fn drop(&mut self) {
481 self.stop();
482 }
483}
484
485impl AppStateBuilder {
486 pub fn new() -> Self {
487 Self {
488 config: None,
489 stream_engine: None,
490 callrecord_sender: None,
491 callrecord_formatter: None,
492 cancel_token: None,
493 create_invitation_handler: None,
494 config_loaded_at: None,
495 config_path: None,
496 }
497 }
498
499 pub fn with_config(mut self, mut config: Config) -> Self {
500 if config.ensure_recording_defaults() {
501 warn!(
502 "recorder_format=ogg requires compiling with the 'opus' feature; falling back to wav"
503 );
504 }
505 self.config = Some(config);
506 if self.config_loaded_at.is_none() {
507 self.config_loaded_at = Some(Utc::now());
508 }
509 self
510 }
511
512 pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
513 self.stream_engine = Some(stream_engine);
514 self
515 }
516
517 pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
518 self.callrecord_sender = Some(sender);
519 self
520 }
521
522 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
523 self.cancel_token = Some(token);
524 self
525 }
526
527 pub fn with_config_metadata(mut self, path: Option<String>, loaded_at: DateTime<Utc>) -> Self {
528 self.config_path = path;
529 self.config_loaded_at = Some(loaded_at);
530 self
531 }
532
533 pub async fn build(self) -> Result<AppState> {
534 let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
535 let token = self
536 .cancel_token
537 .unwrap_or_else(|| CancellationToken::new());
538 let _ = set_cache_dir(&config.media_cache_path);
539
540 let local_ip = if !config.sip_addr.is_empty() {
541 std::net::IpAddr::from_str(config.sip_addr.as_str())?
542 } else {
543 voice_engine::net_tool::get_first_non_loopback_interface()?
544 };
545 let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
546 let local_addr: SocketAddr = format!("{}:{}", local_ip, config.sip_port).parse()?;
547
548 let udp_conn = rsipstack::transport::udp::UdpConnection::create_connection(
549 local_addr,
550 None,
551 Some(token.child_token()),
552 )
553 .await
554 .map_err(|e| anyhow::anyhow!("Create useragent UDP connection: {} {}", local_addr, e))?;
555
556 transport_layer.add_transport(udp_conn.into());
557 info!("start useragent, addr: {}", local_addr);
558
559 let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
560 let mut endpoint_builder = rsipstack::EndpointBuilder::new();
561 if let Some(ref user_agent) = config.useragent {
562 endpoint_builder.with_user_agent(user_agent.as_str());
563 }
564 let endpoint = endpoint_builder
565 .with_cancel_token(token.child_token())
566 .with_transport_layer(transport_layer)
567 .with_option(endpoint_option)
568 .build();
569 let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
570
571 let stream_engine = self.stream_engine.unwrap_or_default();
572
573 let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
574 formatter
575 } else {
576 let formatter = if let Some(ref callrecord) = config.callrecord {
577 DefaultCallRecordFormatter::new_with_config(callrecord)
578 } else {
579 DefaultCallRecordFormatter::default()
580 };
581 Arc::new(formatter)
582 };
583
584 let callrecord_sender = if let Some(sender) = self.callrecord_sender {
585 Some(sender)
586 } else if let Some(ref callrecord) = config.callrecord {
587 let builder = CallRecordManagerBuilder::new()
588 .with_cancel_token(token.child_token())
589 .with_config(callrecord.clone())
590 .with_max_concurrent(32)
591 .with_formatter(callrecord_formatter.clone());
592
593 let mut callrecord_manager = builder.build();
594 let sender = callrecord_manager.sender.clone();
595 tokio::spawn(async move {
596 callrecord_manager.serve().await;
597 });
598 Some(sender)
599 } else {
600 None
601 };
602
603 let app_state = Arc::new(AppStateInner {
604 config,
605 token,
606 stream_engine,
607 callrecord_sender,
608 endpoint,
609 registration_handles: Mutex::new(HashMap::new()),
610 alive_users: Arc::new(RwLock::new(HashSet::new())),
611 dialog_layer: dialog_layer.clone(),
612 create_invitation_handler: self.create_invitation_handler,
613 invitation: Invitation::new(dialog_layer),
614 routing_state: Arc::new(crate::call::RoutingState::new()),
615 pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
616 active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
617 total_calls: AtomicU64::new(0),
618 total_failed_calls: AtomicU64::new(0),
619 });
620
621 Ok(app_state)
622 }
623}