1use crate::{
2 call::{ActiveCallRef, sip::Invitation},
3 callrecord::{
4 CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5 },
6 config::Config,
7 locator::RewriteTargetLocator,
8 useragent::{
9 RegisterOption,
10 invitation::{
11 FnCreateInvitationHandler, PendingDialog, PendingDialogGuard,
12 default_create_invite_handler,
13 },
14 public_address::{
15 LearnedPublicAddresses, LearningMessageInspector, build_public_contact_uri,
16 },
17 registration::{RegistrationHandle, UserCredential},
18 },
19};
20
21use crate::media::{cache::set_cache_dir, engine::StreamEngine};
22use anyhow::Result;
23use chrono::{DateTime, Local};
24use humantime::parse_duration;
25use rsip::prelude::HeadersExt;
26use rsipstack::transaction::{
27 Endpoint, TransactionReceiver,
28 endpoint::{TargetLocator, TransportEventInspector},
29};
30use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
31use std::collections::HashSet;
32use std::str::FromStr;
33use std::sync::{Arc, RwLock};
34use std::time::Duration;
35use std::{collections::HashMap, net::SocketAddr};
36use std::{
37 path::Path,
38 sync::atomic::{AtomicBool, AtomicU64, Ordering},
39};
40use tokio::select;
41use tokio::sync::Mutex;
42use tokio_util::sync::CancellationToken;
43use tracing::{info, warn};
44
45pub struct AppStateInner {
46 pub config: Arc<Config>,
47 pub token: CancellationToken,
48 pub stream_engine: Arc<StreamEngine>,
49 pub callrecord_sender: Option<CallRecordSender>,
50 pub endpoint: Endpoint,
51 pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
52 pub alive_users: Arc<RwLock<HashSet<String>>>,
53 pub dialog_layer: Arc<DialogLayer>,
54 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
55 pub invitation: Invitation,
56 pub routing_state: Arc<crate::call::RoutingState>,
57 pub pending_playbooks: Arc<Mutex<HashMap<String, (String, std::time::Instant)>>>,
58 pub learned_public_addresses: LearnedPublicAddresses,
59
60 pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
61 pub total_calls: AtomicU64,
62 pub total_failed_calls: AtomicU64,
63 pub uptime: DateTime<Local>,
64 pub shutting_down: Arc<AtomicBool>,
65}
66
67pub type AppState = Arc<AppStateInner>;
68
69pub struct AppStateBuilder {
70 pub config: Option<Config>,
71 pub stream_engine: Option<Arc<StreamEngine>>,
72 pub callrecord_sender: Option<CallRecordSender>,
73 pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
74 pub cancel_token: Option<CancellationToken>,
75 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
76 pub config_path: Option<String>,
77
78 pub message_inspector: Option<Box<dyn MessageInspector>>,
79 pub target_locator: Option<Box<dyn TargetLocator>>,
80 pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
81}
82
83impl AppStateInner {
84 pub fn auto_learn_public_address_enabled(&self) -> bool {
85 self.config.auto_learn_public_address.unwrap_or(false)
86 }
87
88 pub fn get_dump_events_file(&self, session_id: &String) -> String {
89 let recorder_root = self.config.recorder_path();
90 let root = Path::new(&recorder_root);
91 if !root.exists() {
92 match std::fs::create_dir_all(root) {
93 Ok(_) => {
94 info!("created dump events root: {}", root.to_string_lossy());
95 }
96 Err(e) => {
97 warn!(
98 "Failed to create dump events root: {} {}",
99 e,
100 root.to_string_lossy()
101 );
102 }
103 }
104 }
105 root.join(format!("{}.events.jsonl", session_id))
106 .to_string_lossy()
107 .to_string()
108 }
109
110 pub fn get_recorder_file(&self, session_id: &String) -> String {
111 let recorder_root = self.config.recorder_path();
112 let root = Path::new(&recorder_root);
113 if !root.exists() {
114 match std::fs::create_dir_all(root) {
115 Ok(_) => {
116 info!("created recorder root: {}", root.to_string_lossy());
117 }
118 Err(e) => {
119 warn!(
120 "Failed to create recorder root: {} {}",
121 e,
122 root.to_string_lossy()
123 );
124 }
125 }
126 }
127 let desired_ext = self.config.recorder_format().extension();
128 let mut filename = session_id.clone();
129 if !filename
130 .to_lowercase()
131 .ends_with(&format!(".{}", desired_ext.to_lowercase()))
132 {
133 filename = format!("{}.{}", filename, desired_ext);
134 }
135 root.join(filename).to_string_lossy().to_string()
136 }
137
138 pub async fn serve(self: Arc<Self>) -> Result<()> {
139 let incoming_txs = self.endpoint.incoming_transactions()?;
140 let token = self.token.child_token();
141 let endpoint_inner = self.endpoint.inner.clone();
142 let dialog_layer = self.dialog_layer.clone();
143 let app_state_clone = self.clone();
144
145 match self.start_registration().await {
146 Ok(count) => {
147 info!("registration started, count: {}", count);
148 }
149 Err(e) => {
150 warn!("failed to start registration: {:?}", e);
151 }
152 }
153
154 let pending_cleanup_state = self.clone();
155 let pending_cleanup_token = token.clone();
156 crate::spawn(async move {
157 let mut interval = tokio::time::interval(Duration::from_secs(60));
158 let ttl = Duration::from_secs(300);
159 loop {
160 tokio::select! {
161 _ = pending_cleanup_token.cancelled() => break,
162 _ = interval.tick() => {
163 let mut pending = pending_cleanup_state.pending_playbooks.lock().await;
164 let before = pending.len();
165 pending.retain(|_, (_, created_at)| created_at.elapsed() < ttl);
166 let removed = before - pending.len();
167 if removed > 0 {
168 info!(removed, remaining = pending.len(), "cleaned up stale pending_playbooks entries");
169 }
170 }
171 }
172 }
173 });
174
175 tokio::select! {
176 _ = token.cancelled() => {
177 info!("cancelled");
178 }
179 result = endpoint_inner.serve() => {
180 if let Err(e) = result {
181 info!("endpoint serve error: {:?}", e);
182 }
183 }
184 result = app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
185 if let Err(e) = result {
186 info!("process incoming request error: {:?}", e);
187 }
188 },
189 }
190
191 let timeout = self
194 .config
195 .graceful_shutdown
196 .map(|_| Duration::from_secs(10));
197
198 match self.stop_registration(timeout).await {
199 Ok(_) => {
200 info!("registration stopped, waiting for clear");
201 }
202 Err(e) => {
203 warn!("failed to stop registration: {:?}", e);
204 }
205 }
206 info!("stopping");
207 Ok(())
208 }
209
210 async fn process_incoming_request(
211 self: Arc<Self>,
212 dialog_layer: Arc<DialogLayer>,
213 mut incoming: TransactionReceiver,
214 ) -> Result<()> {
215 while let Some(mut tx) = incoming.recv().await {
216 let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
217 info!(?key, "received transaction");
218 if tx.original.to_header()?.tag()?.as_ref().is_some() {
219 match dialog_layer.match_dialog(&tx) {
220 Some(mut d) => {
221 crate::spawn(async move {
222 match d.handle(&mut tx).await {
223 Ok(_) => (),
224 Err(e) => {
225 info!("error handling transaction: {:?}", e);
226 }
227 }
228 });
229 continue;
230 }
231 None => {
232 info!("dialog not found: {}", tx.original);
233 match tx
234 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
235 .await
236 {
237 Ok(_) => (),
238 Err(e) => {
239 info!("error replying to request: {:?}", e);
240 }
241 }
242 continue;
243 }
244 }
245 }
246 let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
248 match tx.original.method {
249 rsip::Method::Invite | rsip::Method::Ack => {
250 if self.shutting_down.load(Ordering::Relaxed) {
252 info!(?key, "rejecting INVITE during graceful shutdown");
253 match tx
254 .reply_with(
255 rsip::StatusCode::ServiceUnavailable,
256 vec![rsip::Header::Other(
257 "Reason".into(),
258 "SIP;cause=503;text=\"Server shutting down\"".into(),
259 )],
260 None,
261 )
262 .await
263 {
264 Ok(_) => (),
265 Err(e) => {
266 info!("error replying to request: {:?}", e);
267 }
268 }
269 continue;
270 }
271
272 let invitation_handler = match self.create_invitation_handler {
273 Some(ref create_invitation_handler) => {
274 create_invitation_handler(self.config.handler.as_ref()).ok()
275 }
276 _ => default_create_invite_handler(
277 self.config.handler.as_ref(),
278 Some(self.clone()),
279 ),
280 };
281 let invitation_handler = match invitation_handler {
282 Some(h) => h,
283 None => {
284 info!(?key, "no invite handler configured, rejecting INVITE");
285 match tx
286 .reply_with(
287 rsip::StatusCode::ServiceUnavailable,
288 vec![rsip::Header::Other(
289 "Reason".into(),
290 "SIP;cause=503;text=\"No invite handler configured\""
291 .into(),
292 )],
293 None,
294 )
295 .await
296 {
297 Ok(_) => (),
298 Err(e) => {
299 info!("error replying to request: {:?}", e);
300 }
301 }
302 continue;
303 }
304 };
305 let local_addr = tx
306 .connection
307 .as_ref()
308 .map(|connection| connection.get_addr().clone())
309 .or_else(|| dialog_layer.endpoint.get_addrs().first().cloned());
310 let contact_username =
311 tx.original.uri.auth.as_ref().map(|auth| auth.user.as_str());
312 let contact = local_addr.as_ref().map(|addr| {
313 build_public_contact_uri(
314 &self.learned_public_addresses,
315 self.auto_learn_public_address_enabled(),
316 addr,
317 contact_username,
318 None,
319 )
320 });
321
322 let dialog = match dialog_layer.get_or_create_server_invite(
323 &tx,
324 state_sender,
325 None,
326 contact,
327 ) {
328 Ok(d) => d,
329 Err(e) => {
330 info!("failed to obtain dialog: {:?}", e);
332 match tx
333 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
334 .await
335 {
336 Ok(_) => (),
337 Err(e) => {
338 info!("error replying to request: {:?}", e);
339 }
340 }
341 continue;
342 }
343 };
344
345 let dialog_id = dialog.id();
346 let dialog_id_str = dialog_id.to_string();
347 let token = self.token.child_token();
348 let pending_dialog = PendingDialog {
349 token: token.clone(),
350 dialog: dialog.clone(),
351 state_receiver,
352 };
353
354 let guard = Arc::new(PendingDialogGuard::new(
355 self.invitation.clone(),
356 dialog_id,
357 pending_dialog,
358 ));
359
360 let accept_timeout = self
361 .config
362 .accept_timeout
363 .as_ref()
364 .and_then(|t| parse_duration(t).ok())
365 .unwrap_or_else(|| Duration::from_secs(60));
366
367 let token_ref = token.clone();
368 let guard_ref = guard.clone();
369 crate::spawn(async move {
370 select! {
371 _ = token_ref.cancelled() => {}
372 _ = tokio::time::sleep(accept_timeout) => {}
373 }
374 guard_ref.drop_async().await;
375 });
376
377 let mut dialog_ref = dialog.clone();
378 let token_ref = token.clone();
379 let routing_state = self.routing_state.clone();
380 let dialog_for_reject = dialog.clone();
381 let guard_ref = guard.clone();
382 crate::spawn(async move {
383 let invite_loop = async {
384 match invitation_handler
385 .on_invite(
386 dialog_id_str.clone(),
387 token.clone(),
388 dialog.clone(),
389 routing_state,
390 )
391 .await
392 {
393 Ok(_) => (),
394 Err(e) => {
395 info!(id = dialog_id_str, "error handling invite: {:?}", e);
397 let reason = format!("Failed to process invite: {}", e);
398 if let Err(reject_err) = dialog_for_reject.reject(
399 Some(rsip::StatusCode::ServiceUnavailable),
400 Some(reason),
401 ) {
402 info!(
403 id = dialog_id_str,
404 "error rejecting call: {:?}", reject_err
405 );
406 }
407 token.cancel();
409 guard_ref.drop_async().await;
410 }
411 }
412 };
413 select! {
414 _ = token_ref.cancelled() => {}
415 _ = async {
416 let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
417 } => {}
418 }
419 });
420 }
421 rsip::Method::Options => {
422 info!(?key, "ignoring out-of-dialog OPTIONS request");
423 continue;
424 }
425 _ => {
426 info!(?key, "received request: {:?}", tx.original.method);
427 match tx.reply(rsip::StatusCode::OK).await {
428 Ok(_) => (),
429 Err(e) => {
430 info!("error replying to request: {:?}", e);
431 }
432 }
433 }
434 }
435 }
436 Ok(())
437 }
438
439 pub fn stop(&self) {
440 if self.shutting_down.swap(true, Ordering::Relaxed) {
441 return;
442 }
443 info!("stopping, marking as shutting down");
444 self.token.cancel();
445 }
446
447 pub async fn graceful_stop(&self) -> Result<()> {
448 if self.shutting_down.swap(true, Ordering::Relaxed) {
449 return Ok(());
450 }
451
452 info!("graceful stopping, marking as shutting down");
453 let timeout = self
454 .config
455 .graceful_shutdown
456 .map(|_| Duration::from_secs(10));
457
458 self.stop_registration(timeout).await?;
459 self.token.cancel();
460 Ok(())
461 }
462
463 pub async fn start_registration(&self) -> Result<usize> {
464 let mut count = 0;
465 if let Some(register_users) = &self.config.register_users {
466 for option in register_users.iter() {
467 match self.register(option.clone()).await {
468 Ok(_) => {
469 count += 1;
470 }
471 Err(e) => {
472 warn!("failed to register user: {:?} {:?}", e, option);
473 }
474 }
475 }
476 }
477 Ok(count)
478 }
479
480 pub fn find_credentials_for_callee(&self, callee: &str) -> Option<UserCredential> {
481 let callee_uri = callee
482 .strip_prefix("sip:")
483 .or_else(|| callee.strip_prefix("sips:"))
484 .unwrap_or(callee);
485 let callee_uri = if !callee_uri.starts_with("sip:") && !callee_uri.starts_with("sips:") {
486 format!("sip:{}", callee_uri)
487 } else {
488 callee_uri.to_string()
489 };
490
491 let parsed_callee = match rsip::Uri::try_from(callee_uri.as_str()) {
492 Ok(uri) => uri,
493 Err(e) => {
494 warn!("failed to parse callee URI: {} {:?}", callee, e);
495 return None;
496 }
497 };
498
499 let callee_host = match &parsed_callee.host_with_port.host {
500 rsip::Host::Domain(domain) => domain.to_string(),
501 rsip::Host::IpAddr(ip) => return self.find_credentials_by_ip(ip),
502 };
503
504 if let Some(register_users) = &self.config.register_users {
506 for option in register_users.iter() {
507 let mut server = option.server.clone();
508 if !server.starts_with("sip:") && !server.starts_with("sips:") {
509 server = format!("sip:{}", server);
510 }
511
512 let parsed_server = match rsip::Uri::try_from(server.as_str()) {
513 Ok(uri) => uri,
514 Err(e) => {
515 warn!("failed to parse server URI: {} {:?}", option.server, e);
516 continue;
517 }
518 };
519
520 let server_host = match &parsed_server.host_with_port.host {
521 rsip::Host::Domain(domain) => domain.to_string(),
522 rsip::Host::IpAddr(ip) => {
523 if let rsip::Host::IpAddr(callee_ip) = &parsed_callee.host_with_port.host {
525 if ip == callee_ip {
526 if let Some(cred) = &option.credential {
527 info!(
528 callee,
529 username = cred.username,
530 server = option.server,
531 "Auto-injecting credentials from registered user for outbound call (IP match)"
532 );
533 return Some(cred.clone());
534 }
535 }
536 }
537 continue;
538 }
539 };
540
541 if server_host == callee_host {
542 if let Some(cred) = &option.credential {
543 info!(
544 callee,
545 username = cred.username,
546 server = option.server,
547 "Auto-injecting credentials from registered user for outbound call"
548 );
549 return Some(cred.clone());
550 }
551 }
552 }
553 }
554
555 None
556 }
557
558 fn find_credentials_by_ip(
560 &self,
561 callee_ip: &std::net::IpAddr,
562 ) -> Option<crate::useragent::registration::UserCredential> {
563 if let Some(register_users) = &self.config.register_users {
564 for option in register_users.iter() {
565 let mut server = option.server.clone();
566 if !server.starts_with("sip:") && !server.starts_with("sips:") {
567 server = format!("sip:{}", server);
568 }
569
570 if let Ok(parsed_server) = rsip::Uri::try_from(server.as_str()) {
571 if let rsip::Host::IpAddr(server_ip) = &parsed_server.host_with_port.host {
572 if server_ip == callee_ip {
573 if let Some(cred) = &option.credential {
574 info!(
575 callee_ip = %callee_ip,
576 username = cred.username,
577 server = option.server,
578 "Auto-injecting credentials from registered user for outbound call (IP match)"
579 );
580 return Some(cred.clone());
581 }
582 }
583 }
584 }
585 }
586 }
587 None
588 }
589
590 pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
591 {
592 let mut handles = self.registration_handles.lock().await;
593 for (_, handle) in handles.iter_mut() {
594 handle.stop();
595 }
596 handles.clear();
597 }
598
599 if let Some(duration) = wait_for_clear {
600 let live_users = self.alive_users.clone();
601 let check_loop = async move {
602 loop {
603 let is_empty = {
604 let users = live_users
605 .read()
606 .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
607 users.is_empty()
608 };
609 if is_empty {
610 break;
611 }
612 tokio::time::sleep(Duration::from_millis(50)).await;
613 }
614 Ok::<(), anyhow::Error>(())
615 };
616 match tokio::time::timeout(duration, check_loop).await {
617 Ok(_) => {}
618 Err(e) => {
619 warn!("failed to wait for clear: {}", e);
620 return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
621 }
622 }
623 }
624 Ok(())
625 }
626
627 pub async fn register(&self, option: RegisterOption) -> Result<()> {
628 let user = option.aor();
629 let mut server = option.server.clone();
630 if !server.starts_with("sip:") && !server.starts_with("sips:") {
631 server = format!("sip:{}", server);
632 }
633 let sip_server = match rsip::Uri::try_from(server) {
634 Ok(uri) => uri,
635 Err(e) => {
636 warn!("failed to parse server: {} {:?}", e, option.server);
637 return Err(anyhow::anyhow!("failed to parse server: {}", e));
638 }
639 };
640 let cancel_token = self.token.child_token();
641 let credential = option.credential.clone().map(|c| c.into());
642 let registration = rsipstack::dialog::registration::Registration::new(
643 self.endpoint.inner.clone(),
644 credential,
645 );
646 let handle = RegistrationHandle {
647 inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
648 registration: Mutex::new(registration),
649 option,
650 cancel_token,
651 start_time: Mutex::new(std::time::Instant::now()),
652 last_update: Mutex::new(std::time::Instant::now()),
653 last_response: Mutex::new(None),
654 }),
655 };
656 self.registration_handles
657 .lock()
658 .await
659 .insert(user.clone(), handle.clone());
660 tracing::debug!(user = user.as_str(), "starting registration task");
661 let alive_users = self.alive_users.clone();
662
663 crate::spawn(async move {
664 *handle.inner.start_time.lock().await = std::time::Instant::now();
665
666 select! {
667 _ = handle.inner.cancel_token.cancelled() => {
668 }
669 _ = async {
670 loop {
671 let user = handle.inner.option.aor();
672 alive_users.write().unwrap().remove(&user);
673 let refresh_time = match handle.do_register(&sip_server, None).await {
674 Ok(expires) => {
675 info!(
676 user = handle.inner.option.aor(),
677 expires = expires,
678 alive_users = alive_users.read().unwrap().len(),
679 "registration refreshed",
680 );
681 alive_users.write().unwrap().insert(user);
682 expires * 3 / 4 }
684 Err(e) => {
685 warn!(
686 user = handle.inner.option.aor(),
687 alive_users = alive_users.read().unwrap().len(),
688 "registration failed: {:?}", e);
689 60
690 }
691 };
692 tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
693 }
694 } => {}
695 }
696 handle.do_register(&sip_server, Some(0)).await.ok();
697 alive_users.write().unwrap().remove(&user);
698 });
699 Ok(())
700 }
701}
702
703impl Drop for AppStateInner {
704 fn drop(&mut self) {
705 self.stop();
706 }
707}
708
709impl AppStateBuilder {
710 pub fn new() -> Self {
711 Self {
712 config: None,
713 stream_engine: None,
714 callrecord_sender: None,
715 callrecord_formatter: None,
716 cancel_token: None,
717 create_invitation_handler: None,
718 config_path: None,
719 message_inspector: None,
720 target_locator: None,
721 transport_inspector: None,
722 }
723 }
724
725 pub fn with_config(mut self, config: Config) -> Self {
726 self.config = Some(config);
727 self
728 }
729
730 pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
731 self.stream_engine = Some(stream_engine);
732 self
733 }
734
735 pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
736 self.callrecord_sender = Some(sender);
737 self
738 }
739
740 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
741 self.cancel_token = Some(token);
742 self
743 }
744
745 pub fn with_config_metadata(mut self, path: Option<String>) -> Self {
746 self.config_path = path;
747 self
748 }
749
750 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
751 self.message_inspector = Some(inspector);
752 self
753 }
754 pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
755 self.target_locator = Some(locator);
756 self
757 }
758
759 pub fn with_transport_inspector(
760 &mut self,
761 inspector: Box<dyn TransportEventInspector>,
762 ) -> &mut Self {
763 self.transport_inspector = Some(inspector);
764 self
765 }
766
767 pub async fn build(self) -> Result<AppState> {
768 let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
769 let token = self
770 .cancel_token
771 .unwrap_or_else(|| CancellationToken::new());
772 let _ = set_cache_dir(&config.media_cache_path);
773 let learned_public_addresses = LearnedPublicAddresses::default();
774
775 let local_ip = if !config.addr.is_empty() {
776 std::net::IpAddr::from_str(config.addr.as_str())?
777 } else {
778 crate::net_tool::get_first_non_loopback_interface()?
779 };
780 let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
781 let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
782
783 #[cfg(unix)]
785 let std_socket = {
786 use socket2::{Domain, Protocol, SockAddr, Socket, Type};
787
788 let domain = if local_addr.is_ipv4() {
789 Domain::IPV4
790 } else {
791 Domain::IPV6
792 };
793 let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
794 .map_err(|err| anyhow::anyhow!("Failed to create UDP socket: {}", err))?;
795
796 socket
797 .set_reuse_address(true)
798 .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEADDR: {}", err))?;
799
800 #[cfg(not(any(target_os = "solaris", target_os = "illumos", target_os = "cygwin")))]
802 socket
803 .set_reuse_port(true)
804 .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEPORT: {}", err))?;
805
806 socket
807 .bind(&SockAddr::from(local_addr))
808 .map_err(|err| anyhow::anyhow!("Failed to bind UDP socket: {}", err))?;
809
810 let std_socket: std::net::UdpSocket = socket.into();
811 std_socket
812 };
813
814 #[cfg(not(unix))]
815 let std_socket = std::net::UdpSocket::bind(local_addr)?;
816
817 std_socket.set_nonblocking(true)?;
818 let tokio_socket = tokio::net::UdpSocket::from_std(std_socket)?;
819 let actual_addr = tokio_socket.local_addr()?;
821 let bind_addr = rsipstack::transport::SipConnection::resolve_bind_address(actual_addr);
822
823 let udp_inner = rsipstack::transport::udp::UdpInner {
824 conn: tokio_socket,
825 addr: rsipstack::transport::SipAddr {
826 r#type: Some(rsip::transport::Transport::Udp),
827 addr: bind_addr.into(),
828 },
829 };
830
831 let external = config
832 .external_ip
833 .as_ref()
834 .map(|ip| {
835 format!("{}:{}", ip, actual_addr.port())
836 .parse()
837 .map_err(|e| anyhow::anyhow!("Failed to parse external address: {}", e))
838 })
839 .transpose()?;
840
841 let udp_conn = rsipstack::transport::udp::UdpConnection::attach(
842 udp_inner,
843 external,
844 Some(token.child_token()),
845 )
846 .await;
847
848 info!(
849 "start useragent, addr: {} (SO_REUSEPORT enabled)",
850 udp_conn.get_addr()
851 );
852
853 transport_layer.add_transport(udp_conn.into());
854
855 if let Some(tls_port) = config.tls_port {
857 let tls_addr: std::net::SocketAddr = format!("{}:{}", local_ip, tls_port).parse()?;
858 let tls_sip_addr = rsipstack::transport::SipAddr {
859 r#type: Some(rsip::transport::Transport::Tls),
860 addr: tls_addr.into(),
861 };
862 let mut tls_cfg = rsipstack::transport::tls::TlsConfig::default();
863 if let Some(ref cert_path) = config.tls_cert_file {
864 tls_cfg.cert = Some(
865 std::fs::read(cert_path)
866 .map_err(|e| anyhow::anyhow!("tls_cert_file: {}", e))?,
867 );
868 }
869 if let Some(ref key_path) = config.tls_key_file {
870 tls_cfg.key = Some(
871 std::fs::read(key_path).map_err(|e| anyhow::anyhow!("tls_key_file: {}", e))?,
872 );
873 }
874 let external_tls_addr = config
875 .external_ip
876 .as_ref()
877 .and_then(|ip| format!("{}:{}", ip, tls_port).parse().ok());
878 match rsipstack::transport::tls::TlsListenerConnection::new(
879 tls_sip_addr,
880 external_tls_addr,
881 tls_cfg,
882 )
883 .await
884 {
885 Ok(tls_conn) => {
886 transport_layer.add_transport(tls_conn.into());
887 info!("TLS SIP transport started on {}:{}", local_ip, tls_port);
888 }
889 Err(e) => {
890 return Err(anyhow::anyhow!("Failed to start TLS SIP transport: {}", e));
891 }
892 }
893 }
894
895 let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
896 let mut endpoint_builder = rsipstack::EndpointBuilder::new();
897 if let Some(ref user_agent) = config.useragent {
898 endpoint_builder.with_user_agent(user_agent.as_str());
899 }
900
901 let mut endpoint_builder = endpoint_builder
902 .with_cancel_token(token.child_token())
903 .with_transport_layer(transport_layer)
904 .with_option(endpoint_option);
905
906 if config.auto_learn_public_address.unwrap_or(false) {
907 endpoint_builder =
908 endpoint_builder.with_inspector(Box::new(LearningMessageInspector::new(
909 learned_public_addresses.clone(),
910 self.message_inspector,
911 )));
912 } else if let Some(inspector) = self.message_inspector {
913 endpoint_builder = endpoint_builder.with_inspector(inspector);
914 }
915
916 if let Some(locator) = self.target_locator {
917 endpoint_builder.with_target_locator(locator);
918 } else if let Some(ref rules) = config.rewrites {
919 endpoint_builder
920 .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
921 }
922
923 if let Some(inspector) = self.transport_inspector {
924 endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
925 }
926
927 let endpoint = endpoint_builder.build();
928 let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
929
930 let stream_engine = self.stream_engine.unwrap_or_default();
931
932 let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
933 formatter
934 } else {
935 let formatter = if let Some(ref callrecord) = config.callrecord {
936 DefaultCallRecordFormatter::new_with_config(callrecord)
937 } else {
938 DefaultCallRecordFormatter::default()
939 };
940 Arc::new(formatter)
941 };
942
943 let callrecord_sender = if let Some(sender) = self.callrecord_sender {
944 Some(sender)
945 } else if let Some(ref callrecord) = config.callrecord {
946 let builder = CallRecordManagerBuilder::new()
947 .with_cancel_token(token.child_token())
948 .with_config(callrecord.clone())
949 .with_max_concurrent(32)
950 .with_formatter(callrecord_formatter.clone());
951
952 let mut callrecord_manager = builder.build();
953 let sender = callrecord_manager.sender.clone();
954 crate::spawn(async move {
955 callrecord_manager.serve().await;
956 });
957 Some(sender)
958 } else {
959 None
960 };
961
962 let app_state = Arc::new(AppStateInner {
963 config,
964 token,
965 stream_engine,
966 callrecord_sender,
967 endpoint,
968 registration_handles: Mutex::new(HashMap::new()),
969 alive_users: Arc::new(RwLock::new(HashSet::new())),
970 dialog_layer: dialog_layer.clone(),
971 create_invitation_handler: self.create_invitation_handler,
972 invitation: Invitation::new(dialog_layer),
973 routing_state: Arc::new(crate::call::RoutingState::new()),
974 pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
975 learned_public_addresses,
976 active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
977 total_calls: AtomicU64::new(0),
978 total_failed_calls: AtomicU64::new(0),
979 uptime: Local::now(),
980 shutting_down: Arc::new(AtomicBool::new(false)),
981 });
982
983 Ok(app_state)
984 }
985}