1use crate::{
2 call::{ActiveCallRef, sip::Invitation},
3 callrecord::{
4 CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5 },
6 config::Config,
7 locator::RewriteTargetLocator,
8 useragent::{
9 RegisterOption,
10 invitation::{
11 FnCreateInvitationHandler, PendingDialog, PendingDialogGuard,
12 default_create_invite_handler,
13 },
14 registration::{RegistrationHandle, UserCredential},
15 },
16};
17
18use crate::media::{cache::set_cache_dir, engine::StreamEngine};
19use anyhow::Result;
20use chrono::{DateTime, Local};
21use humantime::parse_duration;
22use rsip::prelude::HeadersExt;
23use rsipstack::transaction::{
24 Endpoint, TransactionReceiver,
25 endpoint::{TargetLocator, TransportEventInspector},
26};
27use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
28use std::collections::HashSet;
29use std::str::FromStr;
30use std::sync::{Arc, RwLock};
31use std::time::Duration;
32use std::{collections::HashMap, net::SocketAddr};
33use std::{
34 path::Path,
35 sync::atomic::{AtomicBool, AtomicU64, Ordering},
36};
37use tokio::select;
38use tokio::sync::Mutex;
39use tokio_util::sync::CancellationToken;
40use tracing::{info, warn};
41
42pub struct AppStateInner {
43 pub config: Arc<Config>,
44 pub token: CancellationToken,
45 pub stream_engine: Arc<StreamEngine>,
46 pub callrecord_sender: Option<CallRecordSender>,
47 pub endpoint: Endpoint,
48 pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
49 pub alive_users: Arc<RwLock<HashSet<String>>>,
50 pub dialog_layer: Arc<DialogLayer>,
51 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
52 pub invitation: Invitation,
53 pub routing_state: Arc<crate::call::RoutingState>,
54 pub pending_playbooks: Arc<Mutex<HashMap<String, (String, std::time::Instant)>>>,
55
56 pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
57 pub total_calls: AtomicU64,
58 pub total_failed_calls: AtomicU64,
59 pub uptime: DateTime<Local>,
60 pub shutting_down: Arc<AtomicBool>,
61}
62
63pub type AppState = Arc<AppStateInner>;
64
65pub struct AppStateBuilder {
66 pub config: Option<Config>,
67 pub stream_engine: Option<Arc<StreamEngine>>,
68 pub callrecord_sender: Option<CallRecordSender>,
69 pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
70 pub cancel_token: Option<CancellationToken>,
71 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
72 pub config_path: Option<String>,
73
74 pub message_inspector: Option<Box<dyn MessageInspector>>,
75 pub target_locator: Option<Box<dyn TargetLocator>>,
76 pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
77}
78
79impl AppStateInner {
80 pub fn get_dump_events_file(&self, session_id: &String) -> String {
81 let recorder_root = self.config.recorder_path();
82 let root = Path::new(&recorder_root);
83 if !root.exists() {
84 match std::fs::create_dir_all(root) {
85 Ok(_) => {
86 info!("created dump events root: {}", root.to_string_lossy());
87 }
88 Err(e) => {
89 warn!(
90 "Failed to create dump events root: {} {}",
91 e,
92 root.to_string_lossy()
93 );
94 }
95 }
96 }
97 root.join(format!("{}.events.jsonl", session_id))
98 .to_string_lossy()
99 .to_string()
100 }
101
102 pub fn get_recorder_file(&self, session_id: &String) -> String {
103 let recorder_root = self.config.recorder_path();
104 let root = Path::new(&recorder_root);
105 if !root.exists() {
106 match std::fs::create_dir_all(root) {
107 Ok(_) => {
108 info!("created recorder root: {}", root.to_string_lossy());
109 }
110 Err(e) => {
111 warn!(
112 "Failed to create recorder root: {} {}",
113 e,
114 root.to_string_lossy()
115 );
116 }
117 }
118 }
119 let desired_ext = self.config.recorder_format().extension();
120 let mut filename = session_id.clone();
121 if !filename
122 .to_lowercase()
123 .ends_with(&format!(".{}", desired_ext.to_lowercase()))
124 {
125 filename = format!("{}.{}", filename, desired_ext);
126 }
127 root.join(filename).to_string_lossy().to_string()
128 }
129
130 pub async fn serve(self: Arc<Self>) -> Result<()> {
131 let incoming_txs = self.endpoint.incoming_transactions()?;
132 let token = self.token.child_token();
133 let endpoint_inner = self.endpoint.inner.clone();
134 let dialog_layer = self.dialog_layer.clone();
135 let app_state_clone = self.clone();
136
137 match self.start_registration().await {
138 Ok(count) => {
139 info!("registration started, count: {}", count);
140 }
141 Err(e) => {
142 warn!("failed to start registration: {:?}", e);
143 }
144 }
145
146 let pending_cleanup_state = self.clone();
147 let pending_cleanup_token = token.clone();
148 crate::spawn(async move {
149 let mut interval = tokio::time::interval(Duration::from_secs(60));
150 let ttl = Duration::from_secs(300);
151 loop {
152 tokio::select! {
153 _ = pending_cleanup_token.cancelled() => break,
154 _ = interval.tick() => {
155 let mut pending = pending_cleanup_state.pending_playbooks.lock().await;
156 let before = pending.len();
157 pending.retain(|_, (_, created_at)| created_at.elapsed() < ttl);
158 let removed = before - pending.len();
159 if removed > 0 {
160 info!(removed, remaining = pending.len(), "cleaned up stale pending_playbooks entries");
161 }
162 }
163 }
164 }
165 });
166
167 tokio::select! {
168 _ = token.cancelled() => {
169 info!("cancelled");
170 }
171 result = endpoint_inner.serve() => {
172 if let Err(e) = result {
173 info!("endpoint serve error: {:?}", e);
174 }
175 }
176 result = app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
177 if let Err(e) = result {
178 info!("process incoming request error: {:?}", e);
179 }
180 },
181 }
182
183 let timeout = self
186 .config
187 .graceful_shutdown
188 .map(|_| Duration::from_secs(50));
189
190 match self.stop_registration(timeout).await {
191 Ok(_) => {
192 info!("registration stopped, waiting for clear");
193 }
194 Err(e) => {
195 warn!("failed to stop registration: {:?}", e);
196 }
197 }
198 info!("stopping");
199 Ok(())
200 }
201
202 async fn process_incoming_request(
203 self: Arc<Self>,
204 dialog_layer: Arc<DialogLayer>,
205 mut incoming: TransactionReceiver,
206 ) -> Result<()> {
207 while let Some(mut tx) = incoming.recv().await {
208 let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
209 info!(?key, "received transaction");
210 if tx.original.to_header()?.tag()?.as_ref().is_some() {
211 match dialog_layer.match_dialog(&tx) {
212 Some(mut d) => {
213 crate::spawn(async move {
214 match d.handle(&mut tx).await {
215 Ok(_) => (),
216 Err(e) => {
217 info!("error handling transaction: {:?}", e);
218 }
219 }
220 });
221 continue;
222 }
223 None => {
224 info!("dialog not found: {}", tx.original);
225 match tx
226 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
227 .await
228 {
229 Ok(_) => (),
230 Err(e) => {
231 info!("error replying to request: {:?}", e);
232 }
233 }
234 continue;
235 }
236 }
237 }
238 let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
240 match tx.original.method {
241 rsip::Method::Invite | rsip::Method::Ack => {
242 if self.shutting_down.load(Ordering::Relaxed) {
244 info!(?key, "rejecting INVITE during graceful shutdown");
245 match tx
246 .reply_with(
247 rsip::StatusCode::ServiceUnavailable,
248 vec![rsip::Header::Other(
249 "Reason".into(),
250 "SIP;cause=503;text=\"Server shutting down\"".into(),
251 )],
252 None,
253 )
254 .await
255 {
256 Ok(_) => (),
257 Err(e) => {
258 info!("error replying to request: {:?}", e);
259 }
260 }
261 continue;
262 }
263
264 let invitation_handler = match self.create_invitation_handler {
265 Some(ref create_invitation_handler) => {
266 create_invitation_handler(self.config.handler.as_ref()).ok()
267 }
268 _ => default_create_invite_handler(
269 self.config.handler.as_ref(),
270 Some(self.clone()),
271 ),
272 };
273 let invitation_handler = match invitation_handler {
274 Some(h) => h,
275 None => {
276 info!(?key, "no invite handler configured, rejecting INVITE");
277 match tx
278 .reply_with(
279 rsip::StatusCode::ServiceUnavailable,
280 vec![rsip::Header::Other(
281 "Reason".into(),
282 "SIP;cause=503;text=\"No invite handler configured\""
283 .into(),
284 )],
285 None,
286 )
287 .await
288 {
289 Ok(_) => (),
290 Err(e) => {
291 info!("error replying to request: {:?}", e);
292 }
293 }
294 continue;
295 }
296 };
297 let contact = dialog_layer
298 .endpoint
299 .get_addrs()
300 .first()
301 .map(|addr| rsip::Uri {
302 scheme: Some(rsip::Scheme::Sip),
303 auth: None,
304 host_with_port: addr.addr.clone(),
305 params: vec![],
306 headers: vec![],
307 });
308
309 let dialog = match dialog_layer.get_or_create_server_invite(
310 &tx,
311 state_sender,
312 None,
313 contact,
314 ) {
315 Ok(d) => d,
316 Err(e) => {
317 info!("failed to obtain dialog: {:?}", e);
319 match tx
320 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
321 .await
322 {
323 Ok(_) => (),
324 Err(e) => {
325 info!("error replying to request: {:?}", e);
326 }
327 }
328 continue;
329 }
330 };
331
332 let dialog_id = dialog.id();
333 let dialog_id_str = dialog_id.to_string();
334 let token = self.token.child_token();
335 let pending_dialog = PendingDialog {
336 token: token.clone(),
337 dialog: dialog.clone(),
338 state_receiver,
339 };
340
341 let guard = Arc::new(PendingDialogGuard::new(
342 self.invitation.clone(),
343 dialog_id,
344 pending_dialog,
345 ));
346
347 let accept_timeout = self
348 .config
349 .accept_timeout
350 .as_ref()
351 .and_then(|t| parse_duration(t).ok())
352 .unwrap_or_else(|| Duration::from_secs(60));
353
354 let token_ref = token.clone();
355 let guard_ref = guard.clone();
356 crate::spawn(async move {
357 select! {
358 _ = token_ref.cancelled() => {}
359 _ = tokio::time::sleep(accept_timeout) => {}
360 }
361 guard_ref.drop_async().await;
362 });
363
364 let mut dialog_ref = dialog.clone();
365 let token_ref = token.clone();
366 let routing_state = self.routing_state.clone();
367 let dialog_for_reject = dialog.clone();
368 let guard_ref = guard.clone();
369 crate::spawn(async move {
370 let invite_loop = async {
371 match invitation_handler
372 .on_invite(
373 dialog_id_str.clone(),
374 token.clone(),
375 dialog.clone(),
376 routing_state,
377 )
378 .await
379 {
380 Ok(_) => (),
381 Err(e) => {
382 info!(id = dialog_id_str, "error handling invite: {:?}", e);
384 let reason = format!("Failed to process invite: {}", e);
385 if let Err(reject_err) = dialog_for_reject.reject(
386 Some(rsip::StatusCode::ServiceUnavailable),
387 Some(reason),
388 ) {
389 info!(
390 id = dialog_id_str,
391 "error rejecting call: {:?}", reject_err
392 );
393 }
394 token.cancel();
396 guard_ref.drop_async().await;
397 }
398 }
399 };
400 select! {
401 _ = token_ref.cancelled() => {}
402 _ = async {
403 let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
404 } => {}
405 }
406 });
407 }
408 rsip::Method::Options => {
409 info!(?key, "ignoring out-of-dialog OPTIONS request");
410 continue;
411 }
412 _ => {
413 info!(?key, "received request: {:?}", tx.original.method);
414 match tx.reply(rsip::StatusCode::OK).await {
415 Ok(_) => (),
416 Err(e) => {
417 info!("error replying to request: {:?}", e);
418 }
419 }
420 }
421 }
422 }
423 Ok(())
424 }
425
426 pub fn stop(&self) {
427 info!("stopping, marking as shutting down");
428 self.shutting_down.store(true, Ordering::Relaxed);
429 self.token.cancel();
430 }
431
432 pub async fn start_registration(&self) -> Result<usize> {
433 let mut count = 0;
434 if let Some(register_users) = &self.config.register_users {
435 for option in register_users.iter() {
436 match self.register(option.clone()).await {
437 Ok(_) => {
438 count += 1;
439 }
440 Err(e) => {
441 warn!("failed to register user: {:?} {:?}", e, option);
442 }
443 }
444 }
445 }
446 Ok(count)
447 }
448
449 pub fn find_credentials_for_callee(&self, callee: &str) -> Option<UserCredential> {
450 let callee_uri = callee
451 .strip_prefix("sip:")
452 .or_else(|| callee.strip_prefix("sips:"))
453 .unwrap_or(callee);
454 let callee_uri = if !callee_uri.starts_with("sip:") && !callee_uri.starts_with("sips:") {
455 format!("sip:{}", callee_uri)
456 } else {
457 callee_uri.to_string()
458 };
459
460 let parsed_callee = match rsip::Uri::try_from(callee_uri.as_str()) {
461 Ok(uri) => uri,
462 Err(e) => {
463 warn!("failed to parse callee URI: {} {:?}", callee, e);
464 return None;
465 }
466 };
467
468 let callee_host = match &parsed_callee.host_with_port.host {
469 rsip::Host::Domain(domain) => domain.to_string(),
470 rsip::Host::IpAddr(ip) => return self.find_credentials_by_ip(ip),
471 };
472
473 if let Some(register_users) = &self.config.register_users {
475 for option in register_users.iter() {
476 let mut server = option.server.clone();
477 if !server.starts_with("sip:") && !server.starts_with("sips:") {
478 server = format!("sip:{}", server);
479 }
480
481 let parsed_server = match rsip::Uri::try_from(server.as_str()) {
482 Ok(uri) => uri,
483 Err(e) => {
484 warn!("failed to parse server URI: {} {:?}", option.server, e);
485 continue;
486 }
487 };
488
489 let server_host = match &parsed_server.host_with_port.host {
490 rsip::Host::Domain(domain) => domain.to_string(),
491 rsip::Host::IpAddr(ip) => {
492 if let rsip::Host::IpAddr(callee_ip) = &parsed_callee.host_with_port.host {
494 if ip == callee_ip {
495 if let Some(cred) = &option.credential {
496 info!(
497 callee,
498 username = cred.username,
499 server = option.server,
500 "Auto-injecting credentials from registered user for outbound call (IP match)"
501 );
502 return Some(cred.clone());
503 }
504 }
505 }
506 continue;
507 }
508 };
509
510 if server_host == callee_host {
511 if let Some(cred) = &option.credential {
512 info!(
513 callee,
514 username = cred.username,
515 server = option.server,
516 "Auto-injecting credentials from registered user for outbound call"
517 );
518 return Some(cred.clone());
519 }
520 }
521 }
522 }
523
524 None
525 }
526
527 fn find_credentials_by_ip(
529 &self,
530 callee_ip: &std::net::IpAddr,
531 ) -> Option<crate::useragent::registration::UserCredential> {
532 if let Some(register_users) = &self.config.register_users {
533 for option in register_users.iter() {
534 let mut server = option.server.clone();
535 if !server.starts_with("sip:") && !server.starts_with("sips:") {
536 server = format!("sip:{}", server);
537 }
538
539 if let Ok(parsed_server) = rsip::Uri::try_from(server.as_str()) {
540 if let rsip::Host::IpAddr(server_ip) = &parsed_server.host_with_port.host {
541 if server_ip == callee_ip {
542 if let Some(cred) = &option.credential {
543 info!(
544 callee_ip = %callee_ip,
545 username = cred.username,
546 server = option.server,
547 "Auto-injecting credentials from registered user for outbound call (IP match)"
548 );
549 return Some(cred.clone());
550 }
551 }
552 }
553 }
554 }
555 }
556 None
557 }
558
559 pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
560 {
561 let mut handles = self.registration_handles.lock().await;
562 for (_, handle) in handles.iter_mut() {
563 handle.stop();
564 }
565 handles.clear();
566 }
567
568 if let Some(duration) = wait_for_clear {
569 let live_users = self.alive_users.clone();
570 let check_loop = async move {
571 loop {
572 let is_empty = {
573 let users = live_users
574 .read()
575 .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
576 users.is_empty()
577 };
578 if is_empty {
579 break;
580 }
581 tokio::time::sleep(Duration::from_millis(50)).await;
582 }
583 Ok::<(), anyhow::Error>(())
584 };
585 match tokio::time::timeout(duration, check_loop).await {
586 Ok(_) => {}
587 Err(e) => {
588 warn!("failed to wait for clear: {}", e);
589 return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
590 }
591 }
592 }
593 Ok(())
594 }
595
596 pub async fn register(&self, option: RegisterOption) -> Result<()> {
597 let user = option.aor();
598 let mut server = option.server.clone();
599 if !server.starts_with("sip:") && !server.starts_with("sips:") {
600 server = format!("sip:{}", server);
601 }
602 let sip_server = match rsip::Uri::try_from(server) {
603 Ok(uri) => uri,
604 Err(e) => {
605 warn!("failed to parse server: {} {:?}", e, option.server);
606 return Err(anyhow::anyhow!("failed to parse server: {}", e));
607 }
608 };
609 let cancel_token = self.token.child_token();
610 let handle = RegistrationHandle {
611 inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
612 endpoint_inner: self.endpoint.inner.clone(),
613 option,
614 cancel_token,
615 start_time: Mutex::new(std::time::Instant::now()),
616 last_update: Mutex::new(std::time::Instant::now()),
617 last_response: Mutex::new(None),
618 }),
619 };
620 self.registration_handles
621 .lock()
622 .await
623 .insert(user.clone(), handle.clone());
624 tracing::debug!(user = user.as_str(), "starting registration task");
625 let alive_users = self.alive_users.clone();
626
627 crate::spawn(async move {
628 *handle.inner.start_time.lock().await = std::time::Instant::now();
629
630 select! {
631 _ = handle.inner.cancel_token.cancelled() => {
632 }
633 _ = async {
634 loop {
635 let user = handle.inner.option.aor();
636 alive_users.write().unwrap().remove(&user);
637 let refresh_time = match handle.do_register(&sip_server, None).await {
638 Ok(expires) => {
639 info!(
640 user = handle.inner.option.aor(),
641 expires = expires,
642 alive_users = alive_users.read().unwrap().len(),
643 "registration refreshed",
644 );
645 alive_users.write().unwrap().insert(user);
646 expires * 3 / 4 }
648 Err(e) => {
649 warn!(
650 user = handle.inner.option.aor(),
651 alive_users = alive_users.read().unwrap().len(),
652 "registration failed: {:?}", e);
653 60
654 }
655 };
656 tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
657 }
658 } => {}
659 }
660 handle.do_register(&sip_server, Some(0)).await.ok();
661 alive_users.write().unwrap().remove(&user);
662 });
663 Ok(())
664 }
665}
666
667impl Drop for AppStateInner {
668 fn drop(&mut self) {
669 self.stop();
670 }
671}
672
673impl AppStateBuilder {
674 pub fn new() -> Self {
675 Self {
676 config: None,
677 stream_engine: None,
678 callrecord_sender: None,
679 callrecord_formatter: None,
680 cancel_token: None,
681 create_invitation_handler: None,
682 config_path: None,
683 message_inspector: None,
684 target_locator: None,
685 transport_inspector: None,
686 }
687 }
688
689 pub fn with_config(mut self, config: Config) -> Self {
690 self.config = Some(config);
691 self
692 }
693
694 pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
695 self.stream_engine = Some(stream_engine);
696 self
697 }
698
699 pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
700 self.callrecord_sender = Some(sender);
701 self
702 }
703
704 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
705 self.cancel_token = Some(token);
706 self
707 }
708
709 pub fn with_config_metadata(mut self, path: Option<String>) -> Self {
710 self.config_path = path;
711 self
712 }
713
714 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
715 self.message_inspector = Some(inspector);
716 self
717 }
718 pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
719 self.target_locator = Some(locator);
720 self
721 }
722
723 pub fn with_transport_inspector(
724 &mut self,
725 inspector: Box<dyn TransportEventInspector>,
726 ) -> &mut Self {
727 self.transport_inspector = Some(inspector);
728 self
729 }
730
731 pub async fn build(self) -> Result<AppState> {
732 let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
733 let token = self
734 .cancel_token
735 .unwrap_or_else(|| CancellationToken::new());
736 let _ = set_cache_dir(&config.media_cache_path);
737
738 let local_ip = if !config.addr.is_empty() {
739 std::net::IpAddr::from_str(config.addr.as_str())?
740 } else {
741 crate::net_tool::get_first_non_loopback_interface()?
742 };
743 let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
744 let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
745
746 #[cfg(unix)]
748 let std_socket = {
749 use socket2::{Domain, Protocol, SockAddr, Socket, Type};
750
751 let domain = if local_addr.is_ipv4() {
752 Domain::IPV4
753 } else {
754 Domain::IPV6
755 };
756 let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
757 .map_err(|err| anyhow::anyhow!("Failed to create UDP socket: {}", err))?;
758
759 socket
760 .set_reuse_address(true)
761 .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEADDR: {}", err))?;
762
763 #[cfg(not(any(target_os = "solaris", target_os = "illumos", target_os = "cygwin")))]
765 socket
766 .set_reuse_port(true)
767 .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEPORT: {}", err))?;
768
769 socket
770 .bind(&SockAddr::from(local_addr))
771 .map_err(|err| anyhow::anyhow!("Failed to bind UDP socket: {}", err))?;
772
773 let std_socket: std::net::UdpSocket = socket.into();
774 std_socket
775 };
776
777 #[cfg(not(unix))]
778 let std_socket = std::net::UdpSocket::bind(local_addr)?;
779
780 std_socket.set_nonblocking(true)?;
781 let tokio_socket = tokio::net::UdpSocket::from_std(std_socket)?;
782 let actual_addr = tokio_socket.local_addr()?;
784
785 let udp_inner = rsipstack::transport::udp::UdpInner {
786 conn: tokio_socket,
787 addr: rsipstack::transport::SipAddr {
788 r#type: Some(rsip::transport::Transport::Udp),
789 addr: actual_addr.into(),
790 },
791 };
792
793 let udp_conn = rsipstack::transport::udp::UdpConnection::attach(
794 udp_inner,
795 None,
796 Some(token.child_token()),
797 )
798 .await;
799
800 transport_layer.add_transport(udp_conn.into());
801 info!(
802 "start useragent, addr: {} (SO_REUSEPORT enabled)",
803 local_addr
804 );
805
806 if let Some(tls_port) = config.tls_port {
808 let tls_addr: std::net::SocketAddr = format!("{}:{}", local_ip, tls_port).parse()?;
809 let tls_sip_addr = rsipstack::transport::SipAddr {
810 r#type: Some(rsip::transport::Transport::Tls),
811 addr: tls_addr.into(),
812 };
813 let mut tls_cfg = rsipstack::transport::tls::TlsConfig::default();
814 if let Some(ref cert_path) = config.tls_cert_file {
815 tls_cfg.cert = Some(
816 std::fs::read(cert_path)
817 .map_err(|e| anyhow::anyhow!("tls_cert_file: {}", e))?,
818 );
819 }
820 if let Some(ref key_path) = config.tls_key_file {
821 tls_cfg.key = Some(
822 std::fs::read(key_path).map_err(|e| anyhow::anyhow!("tls_key_file: {}", e))?,
823 );
824 }
825 let external_tls_addr = config
826 .external_ip
827 .as_ref()
828 .and_then(|ip| format!("{}:{}", ip, tls_port).parse().ok());
829 match rsipstack::transport::tls::TlsListenerConnection::new(
830 tls_sip_addr,
831 external_tls_addr,
832 tls_cfg,
833 )
834 .await
835 {
836 Ok(tls_conn) => {
837 transport_layer.add_transport(tls_conn.into());
838 info!("TLS SIP transport started on {}:{}", local_ip, tls_port);
839 }
840 Err(e) => {
841 return Err(anyhow::anyhow!("Failed to start TLS SIP transport: {}", e));
842 }
843 }
844 }
845
846 let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
847 let mut endpoint_builder = rsipstack::EndpointBuilder::new();
848 if let Some(ref user_agent) = config.useragent {
849 endpoint_builder.with_user_agent(user_agent.as_str());
850 }
851
852 let mut endpoint_builder = endpoint_builder
853 .with_cancel_token(token.child_token())
854 .with_transport_layer(transport_layer)
855 .with_option(endpoint_option);
856
857 if let Some(inspector) = self.message_inspector {
858 endpoint_builder = endpoint_builder.with_inspector(inspector);
859 }
860
861 if let Some(locator) = self.target_locator {
862 endpoint_builder.with_target_locator(locator);
863 } else if let Some(ref rules) = config.rewrites {
864 endpoint_builder
865 .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
866 }
867
868 if let Some(inspector) = self.transport_inspector {
869 endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
870 }
871
872 let endpoint = endpoint_builder.build();
873 let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
874
875 let stream_engine = self.stream_engine.unwrap_or_default();
876
877 let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
878 formatter
879 } else {
880 let formatter = if let Some(ref callrecord) = config.callrecord {
881 DefaultCallRecordFormatter::new_with_config(callrecord)
882 } else {
883 DefaultCallRecordFormatter::default()
884 };
885 Arc::new(formatter)
886 };
887
888 let callrecord_sender = if let Some(sender) = self.callrecord_sender {
889 Some(sender)
890 } else if let Some(ref callrecord) = config.callrecord {
891 let builder = CallRecordManagerBuilder::new()
892 .with_cancel_token(token.child_token())
893 .with_config(callrecord.clone())
894 .with_max_concurrent(32)
895 .with_formatter(callrecord_formatter.clone());
896
897 let mut callrecord_manager = builder.build();
898 let sender = callrecord_manager.sender.clone();
899 crate::spawn(async move {
900 callrecord_manager.serve().await;
901 });
902 Some(sender)
903 } else {
904 None
905 };
906
907 let app_state = Arc::new(AppStateInner {
908 config,
909 token,
910 stream_engine,
911 callrecord_sender,
912 endpoint,
913 registration_handles: Mutex::new(HashMap::new()),
914 alive_users: Arc::new(RwLock::new(HashSet::new())),
915 dialog_layer: dialog_layer.clone(),
916 create_invitation_handler: self.create_invitation_handler,
917 invitation: Invitation::new(dialog_layer),
918 routing_state: Arc::new(crate::call::RoutingState::new()),
919 pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
920 active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
921 total_calls: AtomicU64::new(0),
922 total_failed_calls: AtomicU64::new(0),
923 uptime: Local::now(),
924 shutting_down: Arc::new(AtomicBool::new(false)),
925 });
926
927 Ok(app_state)
928 }
929}