1use crate::{
2 call::{ActiveCallRef, sip::Invitation},
3 callrecord::{
4 CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5 },
6 config::Config,
7 locator::RewriteTargetLocator,
8 useragent::{
9 RegisterOption,
10 invitation::{
11 FnCreateInvitationHandler, PendingDialog, PendingDialogGuard,
12 default_create_invite_handler,
13 },
14 registration::{RegistrationHandle, UserCredential},
15 },
16};
17
18use crate::media::{cache::set_cache_dir, engine::StreamEngine};
19use anyhow::Result;
20use chrono::{DateTime, Local};
21use humantime::parse_duration;
22use rsip::prelude::HeadersExt;
23use rsipstack::transaction::{
24 Endpoint, TransactionReceiver,
25 endpoint::{TargetLocator, TransportEventInspector},
26};
27use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
28use std::collections::HashSet;
29use std::str::FromStr;
30use std::sync::{Arc, RwLock};
31use std::time::Duration;
32use std::{collections::HashMap, net::SocketAddr};
33use std::{
34 path::Path,
35 sync::atomic::{AtomicBool, AtomicU64, Ordering},
36};
37use tokio::select;
38use tokio::sync::Mutex;
39use tokio_util::sync::CancellationToken;
40use tracing::{info, warn};
41
42pub struct AppStateInner {
43 pub config: Arc<Config>,
44 pub token: CancellationToken,
45 pub stream_engine: Arc<StreamEngine>,
46 pub callrecord_sender: Option<CallRecordSender>,
47 pub endpoint: Endpoint,
48 pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
49 pub alive_users: Arc<RwLock<HashSet<String>>>,
50 pub dialog_layer: Arc<DialogLayer>,
51 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
52 pub invitation: Invitation,
53 pub routing_state: Arc<crate::call::RoutingState>,
54 pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
55 pub pending_params: Arc<Mutex<HashMap<String, HashMap<String, serde_json::Value>>>>,
56
57 pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
58 pub total_calls: AtomicU64,
59 pub total_failed_calls: AtomicU64,
60 pub uptime: DateTime<Local>,
61 pub shutting_down: Arc<AtomicBool>,
62}
63
64pub type AppState = Arc<AppStateInner>;
65
66pub struct AppStateBuilder {
67 pub config: Option<Config>,
68 pub stream_engine: Option<Arc<StreamEngine>>,
69 pub callrecord_sender: Option<CallRecordSender>,
70 pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
71 pub cancel_token: Option<CancellationToken>,
72 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
73 pub config_path: Option<String>,
74
75 pub message_inspector: Option<Box<dyn MessageInspector>>,
76 pub target_locator: Option<Box<dyn TargetLocator>>,
77 pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
78}
79
80impl AppStateInner {
81 pub fn get_dump_events_file(&self, session_id: &String) -> String {
82 let recorder_root = self.config.recorder_path();
83 let root = Path::new(&recorder_root);
84 if !root.exists() {
85 match std::fs::create_dir_all(root) {
86 Ok(_) => {
87 info!("created dump events root: {}", root.to_string_lossy());
88 }
89 Err(e) => {
90 warn!(
91 "Failed to create dump events root: {} {}",
92 e,
93 root.to_string_lossy()
94 );
95 }
96 }
97 }
98 root.join(format!("{}.events.jsonl", session_id))
99 .to_string_lossy()
100 .to_string()
101 }
102
103 pub fn get_recorder_file(&self, session_id: &String) -> String {
104 let recorder_root = self.config.recorder_path();
105 let root = Path::new(&recorder_root);
106 if !root.exists() {
107 match std::fs::create_dir_all(root) {
108 Ok(_) => {
109 info!("created recorder root: {}", root.to_string_lossy());
110 }
111 Err(e) => {
112 warn!(
113 "Failed to create recorder root: {} {}",
114 e,
115 root.to_string_lossy()
116 );
117 }
118 }
119 }
120 let desired_ext = self.config.recorder_format().extension();
121 let mut filename = session_id.clone();
122 if !filename
123 .to_lowercase()
124 .ends_with(&format!(".{}", desired_ext.to_lowercase()))
125 {
126 filename = format!("{}.{}", filename, desired_ext);
127 }
128 root.join(filename).to_string_lossy().to_string()
129 }
130
131 pub async fn serve(self: Arc<Self>) -> Result<()> {
132 let incoming_txs = self.endpoint.incoming_transactions()?;
133 let token = self.token.child_token();
134 let endpoint_inner = self.endpoint.inner.clone();
135 let dialog_layer = self.dialog_layer.clone();
136 let app_state_clone = self.clone();
137
138 match self.start_registration().await {
139 Ok(count) => {
140 info!("registration started, count: {}", count);
141 }
142 Err(e) => {
143 warn!("failed to start registration: {:?}", e);
144 }
145 }
146
147 tokio::select! {
148 _ = token.cancelled() => {
149 info!("cancelled");
150 }
151 result = endpoint_inner.serve() => {
152 if let Err(e) = result {
153 info!("endpoint serve error: {:?}", e);
154 }
155 }
156 result = app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
157 if let Err(e) = result {
158 info!("process incoming request error: {:?}", e);
159 }
160 },
161 }
162
163 let timeout = self
166 .config
167 .graceful_shutdown
168 .map(|_| Duration::from_secs(50));
169
170 match self.stop_registration(timeout).await {
171 Ok(_) => {
172 info!("registration stopped, waiting for clear");
173 }
174 Err(e) => {
175 warn!("failed to stop registration: {:?}", e);
176 }
177 }
178 info!("stopping");
179 Ok(())
180 }
181
182 async fn process_incoming_request(
183 self: Arc<Self>,
184 dialog_layer: Arc<DialogLayer>,
185 mut incoming: TransactionReceiver,
186 ) -> Result<()> {
187 while let Some(mut tx) = incoming.recv().await {
188 let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
189 info!(?key, "received transaction");
190 if tx.original.to_header()?.tag()?.as_ref().is_some() {
191 match dialog_layer.match_dialog(&tx) {
192 Some(mut d) => {
193 crate::spawn(async move {
194 match d.handle(&mut tx).await {
195 Ok(_) => (),
196 Err(e) => {
197 info!("error handling transaction: {:?}", e);
198 }
199 }
200 });
201 continue;
202 }
203 None => {
204 info!("dialog not found: {}", tx.original);
205 match tx
206 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
207 .await
208 {
209 Ok(_) => (),
210 Err(e) => {
211 info!("error replying to request: {:?}", e);
212 }
213 }
214 continue;
215 }
216 }
217 }
218 let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
220 match tx.original.method {
221 rsip::Method::Invite | rsip::Method::Ack => {
222 if self.shutting_down.load(Ordering::Relaxed) {
224 info!(?key, "rejecting INVITE during graceful shutdown");
225 match tx
226 .reply_with(
227 rsip::StatusCode::ServiceUnavailable,
228 vec![rsip::Header::Other(
229 "Reason".into(),
230 "SIP;cause=503;text=\"Server shutting down\"".into(),
231 )],
232 None,
233 )
234 .await
235 {
236 Ok(_) => (),
237 Err(e) => {
238 info!("error replying to request: {:?}", e);
239 }
240 }
241 continue;
242 }
243
244 let invitation_handler = match self.create_invitation_handler {
245 Some(ref create_invitation_handler) => {
246 create_invitation_handler(self.config.handler.as_ref()).ok()
247 }
248 _ => default_create_invite_handler(
249 self.config.handler.as_ref(),
250 Some(self.clone()),
251 ),
252 };
253 let invitation_handler = match invitation_handler {
254 Some(h) => h,
255 None => {
256 info!(?key, "no invite handler configured, rejecting INVITE");
257 match tx
258 .reply_with(
259 rsip::StatusCode::ServiceUnavailable,
260 vec![rsip::Header::Other(
261 "Reason".into(),
262 "SIP;cause=503;text=\"No invite handler configured\""
263 .into(),
264 )],
265 None,
266 )
267 .await
268 {
269 Ok(_) => (),
270 Err(e) => {
271 info!("error replying to request: {:?}", e);
272 }
273 }
274 continue;
275 }
276 };
277 let contact = dialog_layer
278 .endpoint
279 .get_addrs()
280 .first()
281 .map(|addr| rsip::Uri {
282 scheme: Some(rsip::Scheme::Sip),
283 auth: None,
284 host_with_port: addr.addr.clone(),
285 params: vec![],
286 headers: vec![],
287 });
288
289 let dialog = match dialog_layer.get_or_create_server_invite(
290 &tx,
291 state_sender,
292 None,
293 contact,
294 ) {
295 Ok(d) => d,
296 Err(e) => {
297 info!("failed to obtain dialog: {:?}", e);
299 match tx
300 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
301 .await
302 {
303 Ok(_) => (),
304 Err(e) => {
305 info!("error replying to request: {:?}", e);
306 }
307 }
308 continue;
309 }
310 };
311
312 let dialog_id = dialog.id();
313 let dialog_id_str = dialog_id.to_string();
314 let token = self.token.child_token();
315 let pending_dialog = PendingDialog {
316 token: token.clone(),
317 dialog: dialog.clone(),
318 state_receiver,
319 };
320
321 let guard = Arc::new(PendingDialogGuard::new(
322 self.invitation.clone(),
323 dialog_id,
324 pending_dialog,
325 ));
326
327 let accept_timeout = self
328 .config
329 .accept_timeout
330 .as_ref()
331 .and_then(|t| parse_duration(t).ok())
332 .unwrap_or_else(|| Duration::from_secs(60));
333
334 let token_ref = token.clone();
335 let guard_ref = guard.clone();
336 crate::spawn(async move {
337 select! {
338 _ = token_ref.cancelled() => {}
339 _ = tokio::time::sleep(accept_timeout) => {}
340 }
341 guard_ref.drop_async().await;
342 });
343
344 let mut dialog_ref = dialog.clone();
345 let token_ref = token.clone();
346 let routing_state = self.routing_state.clone();
347 let dialog_for_reject = dialog.clone();
348 let guard_ref = guard.clone();
349 crate::spawn(async move {
350 let invite_loop = async {
351 match invitation_handler
352 .on_invite(
353 dialog_id_str.clone(),
354 token.clone(),
355 dialog.clone(),
356 routing_state,
357 )
358 .await
359 {
360 Ok(_) => (),
361 Err(e) => {
362 info!(id = dialog_id_str, "error handling invite: {:?}", e);
364 let reason = format!("Failed to process invite: {}", e);
365 if let Err(reject_err) = dialog_for_reject.reject(
366 Some(rsip::StatusCode::ServiceUnavailable),
367 Some(reason),
368 ) {
369 info!(
370 id = dialog_id_str,
371 "error rejecting call: {:?}", reject_err
372 );
373 }
374 token.cancel();
376 guard_ref.drop_async().await;
377 }
378 }
379 };
380 select! {
381 _ = token_ref.cancelled() => {}
382 _ = async {
383 let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
384 } => {}
385 }
386 });
387 }
388 rsip::Method::Options => {
389 info!(?key, "ignoring out-of-dialog OPTIONS request");
390 continue;
391 }
392 _ => {
393 info!(?key, "received request: {:?}", tx.original.method);
394 match tx.reply(rsip::StatusCode::OK).await {
395 Ok(_) => (),
396 Err(e) => {
397 info!("error replying to request: {:?}", e);
398 }
399 }
400 }
401 }
402 }
403 Ok(())
404 }
405
406 pub fn stop(&self) {
407 info!("stopping, marking as shutting down");
408 self.shutting_down.store(true, Ordering::Relaxed);
409 self.token.cancel();
410 }
411
412 pub async fn start_registration(&self) -> Result<usize> {
413 let mut count = 0;
414 if let Some(register_users) = &self.config.register_users {
415 for option in register_users.iter() {
416 match self.register(option.clone()).await {
417 Ok(_) => {
418 count += 1;
419 }
420 Err(e) => {
421 warn!("failed to register user: {:?} {:?}", e, option);
422 }
423 }
424 }
425 }
426 Ok(count)
427 }
428
429 pub fn find_credentials_for_callee(&self, callee: &str) -> Option<UserCredential> {
430 let callee_uri = callee
431 .strip_prefix("sip:")
432 .or_else(|| callee.strip_prefix("sips:"))
433 .unwrap_or(callee);
434 let callee_uri = if !callee_uri.starts_with("sip:") && !callee_uri.starts_with("sips:") {
435 format!("sip:{}", callee_uri)
436 } else {
437 callee_uri.to_string()
438 };
439
440 let parsed_callee = match rsip::Uri::try_from(callee_uri.as_str()) {
441 Ok(uri) => uri,
442 Err(e) => {
443 warn!("failed to parse callee URI: {} {:?}", callee, e);
444 return None;
445 }
446 };
447
448 let callee_host = match &parsed_callee.host_with_port.host {
449 rsip::Host::Domain(domain) => domain.to_string(),
450 rsip::Host::IpAddr(ip) => return self.find_credentials_by_ip(ip),
451 };
452
453 if let Some(register_users) = &self.config.register_users {
455 for option in register_users.iter() {
456 let mut server = option.server.clone();
457 if !server.starts_with("sip:") && !server.starts_with("sips:") {
458 server = format!("sip:{}", server);
459 }
460
461 let parsed_server = match rsip::Uri::try_from(server.as_str()) {
462 Ok(uri) => uri,
463 Err(e) => {
464 warn!("failed to parse server URI: {} {:?}", option.server, e);
465 continue;
466 }
467 };
468
469 let server_host = match &parsed_server.host_with_port.host {
470 rsip::Host::Domain(domain) => domain.to_string(),
471 rsip::Host::IpAddr(ip) => {
472 if let rsip::Host::IpAddr(callee_ip) = &parsed_callee.host_with_port.host {
474 if ip == callee_ip {
475 if let Some(cred) = &option.credential {
476 info!(
477 callee,
478 username = cred.username,
479 server = option.server,
480 "Auto-injecting credentials from registered user for outbound call (IP match)"
481 );
482 return Some(cred.clone());
483 }
484 }
485 }
486 continue;
487 }
488 };
489
490 if server_host == callee_host {
491 if let Some(cred) = &option.credential {
492 info!(
493 callee,
494 username = cred.username,
495 server = option.server,
496 "Auto-injecting credentials from registered user for outbound call"
497 );
498 return Some(cred.clone());
499 }
500 }
501 }
502 }
503
504 None
505 }
506
507 fn find_credentials_by_ip(
509 &self,
510 callee_ip: &std::net::IpAddr,
511 ) -> Option<crate::useragent::registration::UserCredential> {
512 if let Some(register_users) = &self.config.register_users {
513 for option in register_users.iter() {
514 let mut server = option.server.clone();
515 if !server.starts_with("sip:") && !server.starts_with("sips:") {
516 server = format!("sip:{}", server);
517 }
518
519 if let Ok(parsed_server) = rsip::Uri::try_from(server.as_str()) {
520 if let rsip::Host::IpAddr(server_ip) = &parsed_server.host_with_port.host {
521 if server_ip == callee_ip {
522 if let Some(cred) = &option.credential {
523 info!(
524 callee_ip = %callee_ip,
525 username = cred.username,
526 server = option.server,
527 "Auto-injecting credentials from registered user for outbound call (IP match)"
528 );
529 return Some(cred.clone());
530 }
531 }
532 }
533 }
534 }
535 }
536 None
537 }
538
539 pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
540 for (_, handle) in self.registration_handles.lock().await.iter_mut() {
541 handle.stop();
542 }
543
544 if let Some(duration) = wait_for_clear {
545 let live_users = self.alive_users.clone();
546 let check_loop = async move {
547 loop {
548 let is_empty = {
549 let users = live_users
550 .read()
551 .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
552 users.is_empty()
553 };
554 if is_empty {
555 break;
556 }
557 tokio::time::sleep(Duration::from_millis(50)).await;
558 }
559 Ok::<(), anyhow::Error>(())
560 };
561 match tokio::time::timeout(duration, check_loop).await {
562 Ok(_) => {}
563 Err(e) => {
564 warn!("failed to wait for clear: {}", e);
565 return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
566 }
567 }
568 }
569 Ok(())
570 }
571
572 pub async fn register(&self, option: RegisterOption) -> Result<()> {
573 let user = option.aor();
574 let mut server = option.server.clone();
575 if !server.starts_with("sip:") && !server.starts_with("sips:") {
576 server = format!("sip:{}", server);
577 }
578 let sip_server = match rsip::Uri::try_from(server) {
579 Ok(uri) => uri,
580 Err(e) => {
581 warn!("failed to parse server: {} {:?}", e, option.server);
582 return Err(anyhow::anyhow!("failed to parse server: {}", e));
583 }
584 };
585 let cancel_token = self.token.child_token();
586 let handle = RegistrationHandle {
587 inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
588 endpoint_inner: self.endpoint.inner.clone(),
589 option,
590 cancel_token,
591 start_time: Mutex::new(std::time::Instant::now()),
592 last_update: Mutex::new(std::time::Instant::now()),
593 last_response: Mutex::new(None),
594 }),
595 };
596 self.registration_handles
597 .lock()
598 .await
599 .insert(user.clone(), handle.clone());
600 tracing::debug!(user = user.as_str(), "starting registration task");
601 let alive_users = self.alive_users.clone();
602
603 crate::spawn(async move {
604 *handle.inner.start_time.lock().await = std::time::Instant::now();
605
606 select! {
607 _ = handle.inner.cancel_token.cancelled() => {
608 }
609 _ = async {
610 loop {
611 let user = handle.inner.option.aor();
612 alive_users.write().unwrap().remove(&user);
613 let refresh_time = match handle.do_register(&sip_server, None).await {
614 Ok(expires) => {
615 info!(
616 user = handle.inner.option.aor(),
617 expires = expires,
618 alive_users = alive_users.read().unwrap().len(),
619 "registration refreshed",
620 );
621 alive_users.write().unwrap().insert(user);
622 expires * 3 / 4 }
624 Err(e) => {
625 warn!(
626 user = handle.inner.option.aor(),
627 alive_users = alive_users.read().unwrap().len(),
628 "registration failed: {:?}", e);
629 60
630 }
631 };
632 tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
633 }
634 } => {}
635 }
636 handle.do_register(&sip_server, Some(0)).await.ok();
637 alive_users.write().unwrap().remove(&user);
638 });
639 Ok(())
640 }
641}
642
643impl Drop for AppStateInner {
644 fn drop(&mut self) {
645 self.stop();
646 }
647}
648
649impl AppStateBuilder {
650 pub fn new() -> Self {
651 Self {
652 config: None,
653 stream_engine: None,
654 callrecord_sender: None,
655 callrecord_formatter: None,
656 cancel_token: None,
657 create_invitation_handler: None,
658 config_path: None,
659 message_inspector: None,
660 target_locator: None,
661 transport_inspector: None,
662 }
663 }
664
665 pub fn with_config(mut self, config: Config) -> Self {
666 self.config = Some(config);
667 self
668 }
669
670 pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
671 self.stream_engine = Some(stream_engine);
672 self
673 }
674
675 pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
676 self.callrecord_sender = Some(sender);
677 self
678 }
679
680 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
681 self.cancel_token = Some(token);
682 self
683 }
684
685 pub fn with_config_metadata(mut self, path: Option<String>) -> Self {
686 self.config_path = path;
687 self
688 }
689
690 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
691 self.message_inspector = Some(inspector);
692 self
693 }
694 pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
695 self.target_locator = Some(locator);
696 self
697 }
698
699 pub fn with_transport_inspector(
700 &mut self,
701 inspector: Box<dyn TransportEventInspector>,
702 ) -> &mut Self {
703 self.transport_inspector = Some(inspector);
704 self
705 }
706
707 pub async fn build(self) -> Result<AppState> {
708 let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
709 let token = self
710 .cancel_token
711 .unwrap_or_else(|| CancellationToken::new());
712 let _ = set_cache_dir(&config.media_cache_path);
713
714 let local_ip = if !config.addr.is_empty() {
715 std::net::IpAddr::from_str(config.addr.as_str())?
716 } else {
717 crate::net_tool::get_first_non_loopback_interface()?
718 };
719 let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
720 let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
721
722 #[cfg(unix)]
724 let std_socket = {
725 use socket2::{Domain, Protocol, SockAddr, Socket, Type};
726
727 let domain = if local_addr.is_ipv4() {
728 Domain::IPV4
729 } else {
730 Domain::IPV6
731 };
732 let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
733 .map_err(|err| anyhow::anyhow!("Failed to create UDP socket: {}", err))?;
734
735 socket
736 .set_reuse_address(true)
737 .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEADDR: {}", err))?;
738
739 #[cfg(not(any(target_os = "solaris", target_os = "illumos", target_os = "cygwin")))]
741 socket
742 .set_reuse_port(true)
743 .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEPORT: {}", err))?;
744
745 socket
746 .bind(&SockAddr::from(local_addr))
747 .map_err(|err| anyhow::anyhow!("Failed to bind UDP socket: {}", err))?;
748
749 let std_socket: std::net::UdpSocket = socket.into();
750 std_socket
751 };
752
753 #[cfg(not(unix))]
754 let std_socket = std::net::UdpSocket::bind(local_addr)?;
755
756 std_socket.set_nonblocking(true)?;
757 let tokio_socket = tokio::net::UdpSocket::from_std(std_socket)?;
758 let actual_addr = tokio_socket.local_addr()?;
760
761 let udp_inner = rsipstack::transport::udp::UdpInner {
762 conn: tokio_socket,
763 addr: rsipstack::transport::SipAddr {
764 r#type: Some(rsip::transport::Transport::Udp),
765 addr: actual_addr.into(),
766 },
767 };
768
769 let udp_conn = rsipstack::transport::udp::UdpConnection::attach(
770 udp_inner,
771 None,
772 Some(token.child_token()),
773 )
774 .await;
775
776 transport_layer.add_transport(udp_conn.into());
777 info!(
778 "start useragent, addr: {} (SO_REUSEPORT enabled)",
779 local_addr
780 );
781
782 if let Some(tls_port) = config.tls_port {
784 let tls_addr: std::net::SocketAddr = format!("{}:{}", local_ip, tls_port).parse()?;
785 let tls_sip_addr = rsipstack::transport::SipAddr {
786 r#type: Some(rsip::transport::Transport::Tls),
787 addr: tls_addr.into(),
788 };
789 let mut tls_cfg = rsipstack::transport::tls::TlsConfig::default();
790 if let Some(ref cert_path) = config.tls_cert_file {
791 tls_cfg.cert = Some(
792 std::fs::read(cert_path)
793 .map_err(|e| anyhow::anyhow!("tls_cert_file: {}", e))?,
794 );
795 }
796 if let Some(ref key_path) = config.tls_key_file {
797 tls_cfg.key = Some(
798 std::fs::read(key_path).map_err(|e| anyhow::anyhow!("tls_key_file: {}", e))?,
799 );
800 }
801 let external_tls_addr = config
802 .external_ip
803 .as_ref()
804 .and_then(|ip| format!("{}:{}", ip, tls_port).parse().ok());
805 match rsipstack::transport::tls::TlsListenerConnection::new(
806 tls_sip_addr,
807 external_tls_addr,
808 tls_cfg,
809 )
810 .await
811 {
812 Ok(tls_conn) => {
813 transport_layer.add_transport(tls_conn.into());
814 info!("TLS SIP transport started on {}:{}", local_ip, tls_port);
815 }
816 Err(e) => {
817 return Err(anyhow::anyhow!("Failed to start TLS SIP transport: {}", e));
818 }
819 }
820 }
821
822 let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
823 let mut endpoint_builder = rsipstack::EndpointBuilder::new();
824 if let Some(ref user_agent) = config.useragent {
825 endpoint_builder.with_user_agent(user_agent.as_str());
826 }
827
828 let mut endpoint_builder = endpoint_builder
829 .with_cancel_token(token.child_token())
830 .with_transport_layer(transport_layer)
831 .with_option(endpoint_option);
832
833 if let Some(inspector) = self.message_inspector {
834 endpoint_builder = endpoint_builder.with_inspector(inspector);
835 }
836
837 if let Some(locator) = self.target_locator {
838 endpoint_builder.with_target_locator(locator);
839 } else if let Some(ref rules) = config.rewrites {
840 endpoint_builder
841 .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
842 }
843
844 if let Some(inspector) = self.transport_inspector {
845 endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
846 }
847
848 let endpoint = endpoint_builder.build();
849 let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
850
851 let stream_engine = self.stream_engine.unwrap_or_default();
852
853 let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
854 formatter
855 } else {
856 let formatter = if let Some(ref callrecord) = config.callrecord {
857 DefaultCallRecordFormatter::new_with_config(callrecord)
858 } else {
859 DefaultCallRecordFormatter::default()
860 };
861 Arc::new(formatter)
862 };
863
864 let callrecord_sender = if let Some(sender) = self.callrecord_sender {
865 Some(sender)
866 } else if let Some(ref callrecord) = config.callrecord {
867 let builder = CallRecordManagerBuilder::new()
868 .with_cancel_token(token.child_token())
869 .with_config(callrecord.clone())
870 .with_max_concurrent(32)
871 .with_formatter(callrecord_formatter.clone());
872
873 let mut callrecord_manager = builder.build();
874 let sender = callrecord_manager.sender.clone();
875 crate::spawn(async move {
876 callrecord_manager.serve().await;
877 });
878 Some(sender)
879 } else {
880 None
881 };
882
883 let app_state = Arc::new(AppStateInner {
884 config,
885 token,
886 stream_engine,
887 callrecord_sender,
888 endpoint,
889 registration_handles: Mutex::new(HashMap::new()),
890 alive_users: Arc::new(RwLock::new(HashSet::new())),
891 dialog_layer: dialog_layer.clone(),
892 create_invitation_handler: self.create_invitation_handler,
893 invitation: Invitation::new(dialog_layer),
894 routing_state: Arc::new(crate::call::RoutingState::new()),
895 pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
896 pending_params: Arc::new(Mutex::new(HashMap::new())),
897 active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
898 total_calls: AtomicU64::new(0),
899 total_failed_calls: AtomicU64::new(0),
900 uptime: Local::now(),
901 shutting_down: Arc::new(AtomicBool::new(false)),
902 });
903
904 Ok(app_state)
905 }
906}