1use crate::{
2 call::{ActiveCallRef, sip::Invitation},
3 callrecord::{
4 CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5 },
6 config::Config,
7 locator::RewriteTargetLocator,
8 useragent::{
9 RegisterOption,
10 invitation::{
11 FnCreateInvitationHandler, PendingDialog, PendingDialogGuard,
12 default_create_invite_handler,
13 },
14 public_address::{
15 LearningMessageInspector, SharedPublicAddress, build_contact, build_public_contact_uri,
16 find_local_addr_for_uri,
17 },
18 registration::{RegistrationHandle, UserCredential},
19 },
20};
21
22use crate::media::{cache::set_cache_dir, engine::StreamEngine};
23use anyhow::Result;
24use arc_swap::ArcSwap;
25use chrono::{DateTime, Local};
26use futures::FutureExt;
27use humantime::parse_duration;
28use rsipstack::rsip::prelude::HeadersExt;
29use rsipstack::transaction::{
30 Endpoint, TransactionReceiver,
31 endpoint::{TargetLocator, TransportEventInspector},
32};
33use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
34use std::future::pending;
35use std::str::FromStr;
36use std::sync::{Arc, RwLock};
37use std::time::Duration;
38use std::{collections::HashMap, net::SocketAddr};
39use std::{collections::HashSet, time::Instant};
40use std::{
41 path::Path,
42 sync::atomic::{AtomicBool, AtomicU64, Ordering},
43};
44use tokio::select;
45use tokio::sync::Mutex;
46use tokio_util::sync::CancellationToken;
47use tracing::{info, warn};
48
49pub struct AppStateInner {
50 pub config: Arc<Config>,
51 pub token: CancellationToken,
52 pub stream_engine: Arc<StreamEngine>,
53 pub callrecord_sender: Option<CallRecordSender>,
54 pub endpoint: Endpoint,
55 pub registration_handles: Mutex<HashMap<String, CancellationToken>>,
56 pub alive_users: Arc<RwLock<HashSet<String>>>,
57 pub dialog_layer: Arc<DialogLayer>,
58 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
59 pub invitation: Invitation,
60 pub routing_state: Arc<crate::call::RoutingState>,
61 pub pending_playbooks: Arc<Mutex<HashMap<String, (String, Instant)>>>,
62 pub learned_public_address: SharedPublicAddress,
63
64 pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
65 pub total_calls: AtomicU64,
66 pub total_failed_calls: AtomicU64,
67 pub uptime: DateTime<Local>,
68 pub shutting_down: Arc<AtomicBool>,
69}
70
71pub type AppState = Arc<AppStateInner>;
72
73pub struct AppStateBuilder {
74 pub config: Option<Config>,
75 pub stream_engine: Option<Arc<StreamEngine>>,
76 pub callrecord_sender: Option<CallRecordSender>,
77 pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
78 pub cancel_token: Option<CancellationToken>,
79 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
80 pub config_path: Option<String>,
81
82 pub message_inspector: Option<Box<dyn MessageInspector>>,
83 pub target_locator: Option<Box<dyn TargetLocator>>,
84 pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
85}
86
87impl AppStateInner {
88 pub fn auto_learn_public_address_enabled(&self) -> bool {
89 self.config.auto_learn_public_address.unwrap_or(false)
90 }
91
92 pub fn get_dump_events_file(&self, session_id: &String) -> String {
93 let recorder_root = self.config.recorder_path();
94 let root = Path::new(&recorder_root);
95 if !root.exists() {
96 match std::fs::create_dir_all(root) {
97 Ok(_) => {
98 info!("created dump events root: {}", root.to_string_lossy());
99 }
100 Err(e) => {
101 warn!(
102 "Failed to create dump events root: {} {}",
103 e,
104 root.to_string_lossy()
105 );
106 }
107 }
108 }
109 root.join(format!("{}.events.jsonl", session_id))
110 .to_string_lossy()
111 .to_string()
112 }
113
114 pub fn get_recorder_file(&self, session_id: &String) -> String {
115 let recorder_root = self.config.recorder_path();
116 let root = Path::new(&recorder_root);
117 if !root.exists() {
118 match std::fs::create_dir_all(root) {
119 Ok(_) => {
120 info!("created recorder root: {}", root.to_string_lossy());
121 }
122 Err(e) => {
123 warn!(
124 "Failed to create recorder root: {} {}",
125 e,
126 root.to_string_lossy()
127 );
128 }
129 }
130 }
131 let desired_ext = self.config.recorder_format().extension();
132 let mut filename = session_id.clone();
133 if !filename
134 .to_lowercase()
135 .ends_with(&format!(".{}", desired_ext.to_lowercase()))
136 {
137 filename = format!("{}.{}", filename, desired_ext);
138 }
139 root.join(filename).to_string_lossy().to_string()
140 }
141
142 pub async fn serve(self: Arc<Self>) -> Result<()> {
143 let incoming_txs = self.endpoint.incoming_transactions()?;
144 let token = self.token.child_token();
145 let endpoint_inner = self.endpoint.inner.clone();
146 let dialog_layer = self.dialog_layer.clone();
147 let app_state_clone = self.clone();
148
149 match self.start_registration().await {
150 Ok(count) => {
151 info!("registration started, count: {}", count);
152 }
153 Err(e) => {
154 warn!("failed to start registration: {:?}", e);
155 }
156 }
157
158 let pending_cleanup_state = self.clone();
159 let pending_cleanup_token = token.clone();
160 crate::spawn(async move {
161 let mut interval = tokio::time::interval(Duration::from_secs(60));
162 let ttl = Duration::from_secs(300);
163 loop {
164 tokio::select! {
165 _ = pending_cleanup_token.cancelled() => break,
166 _ = interval.tick() => {
167 let mut pending = pending_cleanup_state.pending_playbooks.lock().await;
168 let before = pending.len();
169 pending.retain(|_, (_, created_at)| created_at.elapsed() < ttl);
170 let removed = before - pending.len();
171 if removed > 0 {
172 info!(removed, remaining = pending.len(), "cleaned up stale pending_playbooks entries");
173 }
174 }
175 }
176 }
177 });
178
179 tokio::select! {
180 _ = token.cancelled() => {
181 info!("cancelled");
182 }
183 result = endpoint_inner.serve() => {
184 if let Err(e) = result {
185 info!("endpoint serve error: {:?}", e);
186 }
187 }
188 result = app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
189 if let Err(e) = result {
190 info!("process incoming request error: {:?}", e);
191 }
192 },
193 }
194
195 let timeout = self
198 .config
199 .graceful_shutdown
200 .map(|_| Duration::from_secs(10));
201
202 match self.stop_registration(timeout).await {
203 Ok(_) => {
204 info!("registration stopped, waiting for clear");
205 }
206 Err(e) => {
207 warn!("failed to stop registration: {:?}", e);
208 }
209 }
210 info!("stopping");
211 Ok(())
212 }
213
214 async fn process_incoming_request(
215 self: Arc<Self>,
216 dialog_layer: Arc<DialogLayer>,
217 mut incoming: TransactionReceiver,
218 ) -> Result<()> {
219 while let Some(mut tx) = incoming.recv().await {
220 let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
221 info!(?key, "received transaction");
222 if tx.original.to_header()?.tag()?.as_ref().is_some() {
223 match dialog_layer.match_dialog(&tx) {
224 Some(mut d) => {
225 crate::spawn(async move {
226 match d.handle(&mut tx).await {
227 Ok(_) => (),
228 Err(e) => {
229 info!("error handling transaction: {:?}", e);
230 }
231 }
232 });
233 continue;
234 }
235 None => {
236 info!("dialog not found: {}", tx.original);
237 match tx
238 .reply(rsipstack::rsip::StatusCode::CallTransactionDoesNotExist)
239 .await
240 {
241 Ok(_) => (),
242 Err(e) => {
243 info!("error replying to request: {:?}", e);
244 }
245 }
246 continue;
247 }
248 }
249 }
250 let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
252 match tx.original.method {
253 rsipstack::rsip::Method::Invite | rsipstack::rsip::Method::Ack => {
254 if self.shutting_down.load(Ordering::Relaxed) {
256 info!(?key, "rejecting INVITE during graceful shutdown");
257 match tx
258 .reply_with(
259 rsipstack::rsip::StatusCode::ServiceUnavailable,
260 vec![rsipstack::rsip::Header::Other(
261 "Reason".into(),
262 "SIP;cause=503;text=\"Server shutting down\"".into(),
263 )],
264 None,
265 )
266 .await
267 {
268 Ok(_) => (),
269 Err(e) => {
270 info!("error replying to request: {:?}", e);
271 }
272 }
273 continue;
274 }
275
276 let invitation_handler = match self.create_invitation_handler {
277 Some(ref create_invitation_handler) => {
278 create_invitation_handler(self.config.handler.as_ref()).ok()
279 }
280 _ => default_create_invite_handler(
281 self.config.handler.as_ref(),
282 Some(self.clone()),
283 ),
284 };
285 let invitation_handler = match invitation_handler {
286 Some(h) => h,
287 None => {
288 info!(?key, "no invite handler configured, rejecting INVITE");
289 match tx
290 .reply_with(
291 rsipstack::rsip::StatusCode::ServiceUnavailable,
292 vec![rsipstack::rsip::Header::Other(
293 "Reason".into(),
294 "SIP;cause=503;text=\"No invite handler configured\""
295 .into(),
296 )],
297 None,
298 )
299 .await
300 {
301 Ok(_) => (),
302 Err(e) => {
303 info!("error replying to request: {:?}", e);
304 }
305 }
306 continue;
307 }
308 };
309 let local_addr = tx
310 .connection
311 .as_ref()
312 .map(|connection| connection.get_addr().clone())
313 .or_else(|| dialog_layer.endpoint.get_addrs().first().cloned());
314 let contact_username =
315 tx.original.uri.auth.as_ref().map(|auth| auth.user.as_str());
316 let contact = local_addr.as_ref().map(|addr| {
317 build_public_contact_uri(
318 &self.learned_public_address,
319 self.auto_learn_public_address_enabled(),
320 addr,
321 contact_username,
322 None,
323 )
324 });
325
326 let dialog = match dialog_layer.get_or_create_server_invite(
327 &tx,
328 state_sender,
329 None,
330 contact,
331 ) {
332 Ok(d) => d,
333 Err(e) => {
334 info!("failed to obtain dialog: {:?}", e);
336 match tx
337 .reply(rsipstack::rsip::StatusCode::CallTransactionDoesNotExist)
338 .await
339 {
340 Ok(_) => (),
341 Err(e) => {
342 info!("error replying to request: {:?}", e);
343 }
344 }
345 continue;
346 }
347 };
348
349 let dialog_id = dialog.id();
350 let dialog_id_str = dialog_id.to_string();
351 let token = self.token.child_token();
352 let pending_dialog = PendingDialog {
353 token: token.clone(),
354 dialog: dialog.clone(),
355 state_receiver,
356 };
357
358 let guard = Arc::new(PendingDialogGuard::new(
359 self.invitation.clone(),
360 dialog_id,
361 pending_dialog,
362 ));
363
364 let accept_timeout = self
365 .config
366 .accept_timeout
367 .as_ref()
368 .and_then(|t| parse_duration(t).ok())
369 .unwrap_or_else(|| Duration::from_secs(60));
370
371 let token_ref = token.clone();
372 let guard_ref = guard.clone();
373 crate::spawn(async move {
374 select! {
375 _ = token_ref.cancelled() => {}
376 _ = tokio::time::sleep(accept_timeout) => {}
377 }
378 guard_ref.drop_async().await;
379 });
380
381 let mut dialog_ref = dialog.clone();
382 let token_ref = token.clone();
383 let routing_state = self.routing_state.clone();
384 let dialog_for_reject = dialog.clone();
385 let guard_ref = guard.clone();
386 crate::spawn(async move {
387 let invite_loop = async {
388 match invitation_handler
389 .on_invite(
390 dialog_id_str.clone(),
391 token.clone(),
392 dialog.clone(),
393 routing_state,
394 )
395 .await
396 {
397 Ok(_) => (),
398 Err(e) => {
399 info!(id = dialog_id_str, "error handling invite: {:?}", e);
401 let reason = format!("Failed to process invite: {}", e);
402 if let Err(reject_err) = dialog_for_reject.reject(
403 Some(rsipstack::rsip::StatusCode::ServiceUnavailable),
404 Some(reason),
405 ) {
406 info!(
407 id = dialog_id_str,
408 "error rejecting call: {:?}", reject_err
409 );
410 }
411 token.cancel();
413 guard_ref.drop_async().await;
414 }
415 }
416 };
417 select! {
418 _ = token_ref.cancelled() => {}
419 _ = async {
420 let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
421 } => {}
422 }
423 });
424 }
425 rsipstack::rsip::Method::Options => {
426 info!(?key, "ignoring out-of-dialog OPTIONS request");
427 continue;
428 }
429 _ => {
430 info!(?key, "received request: {:?}", tx.original.method);
431 match tx.reply(rsipstack::rsip::StatusCode::OK).await {
432 Ok(_) => (),
433 Err(e) => {
434 info!("error replying to request: {:?}", e);
435 }
436 }
437 }
438 }
439 }
440 Ok(())
441 }
442
443 pub fn stop(&self) {
444 if self.shutting_down.swap(true, Ordering::Relaxed) {
445 return;
446 }
447 info!("stopping, marking as shutting down");
448 self.token.cancel();
449 }
450
451 pub async fn graceful_stop(&self) -> Result<()> {
452 if self.shutting_down.swap(true, Ordering::Relaxed) {
453 return Ok(());
454 }
455
456 info!("graceful stopping, marking as shutting down");
457 let timeout = self
458 .config
459 .graceful_shutdown
460 .map(|_| Duration::from_secs(10));
461
462 self.stop_registration(timeout).await?;
463 self.token.cancel();
464 Ok(())
465 }
466
467 pub async fn start_registration(&self) -> Result<usize> {
468 let mut count = 0;
469 if let Some(register_users) = &self.config.register_users {
470 for option in register_users.iter() {
471 match self.register(option.clone()).await {
472 Ok(_) => {
473 count += 1;
474 }
475 Err(e) => {
476 warn!("failed to register user: {:?} {:?}", e, option);
477 }
478 }
479 }
480 }
481 Ok(count)
482 }
483
484 pub fn find_credentials_for_callee(&self, callee: &str) -> Option<UserCredential> {
485 let callee_uri = callee
486 .strip_prefix("sip:")
487 .or_else(|| callee.strip_prefix("sips:"))
488 .unwrap_or(callee);
489 let callee_uri = if !callee_uri.starts_with("sip:") && !callee_uri.starts_with("sips:") {
490 format!("sip:{}", callee_uri)
491 } else {
492 callee_uri.to_string()
493 };
494
495 let parsed_callee = match rsipstack::rsip::Uri::try_from(callee_uri.as_str()) {
496 Ok(uri) => uri,
497 Err(e) => {
498 warn!("failed to parse callee URI: {} {:?}", callee, e);
499 return None;
500 }
501 };
502
503 let callee_host = match &parsed_callee.host_with_port.host {
504 rsipstack::rsip::Host::Domain(domain) => domain.to_string(),
505 rsipstack::rsip::Host::IpAddr(ip) => return self.find_credentials_by_ip(ip),
506 };
507
508 if let Some(register_users) = &self.config.register_users {
510 for option in register_users.iter() {
511 let mut server = option.server.clone();
512 if !server.starts_with("sip:") && !server.starts_with("sips:") {
513 server = format!("sip:{}", server);
514 }
515
516 let parsed_server = match rsipstack::rsip::Uri::try_from(server.as_str()) {
517 Ok(uri) => uri,
518 Err(e) => {
519 warn!("failed to parse server URI: {} {:?}", option.server, e);
520 continue;
521 }
522 };
523
524 let server_host = match &parsed_server.host_with_port.host {
525 rsipstack::rsip::Host::Domain(domain) => domain.to_string(),
526 rsipstack::rsip::Host::IpAddr(ip) => {
527 if let rsipstack::rsip::Host::IpAddr(callee_ip) = &parsed_callee.host_with_port.host {
529 if ip == callee_ip {
530 if let Some(cred) = &option.credential {
531 info!(
532 callee,
533 username = cred.username,
534 server = option.server,
535 "Auto-injecting credentials from registered user for outbound call (IP match)"
536 );
537 return Some(cred.clone());
538 }
539 }
540 }
541 continue;
542 }
543 };
544
545 if server_host == callee_host {
546 if let Some(cred) = &option.credential {
547 info!(
548 callee,
549 username = cred.username,
550 server = option.server,
551 "Auto-injecting credentials from registered user for outbound call"
552 );
553 return Some(cred.clone());
554 }
555 }
556 }
557 }
558
559 None
560 }
561
562 fn find_credentials_by_ip(
564 &self,
565 callee_ip: &std::net::IpAddr,
566 ) -> Option<crate::useragent::registration::UserCredential> {
567 if let Some(register_users) = &self.config.register_users {
568 for option in register_users.iter() {
569 let mut server = option.server.clone();
570 if !server.starts_with("sip:") && !server.starts_with("sips:") {
571 server = format!("sip:{}", server);
572 }
573
574 if let Ok(parsed_server) = rsipstack::rsip::Uri::try_from(server.as_str()) {
575 if let rsipstack::rsip::Host::IpAddr(server_ip) = &parsed_server.host_with_port.host {
576 if server_ip == callee_ip {
577 if let Some(cred) = &option.credential {
578 info!(
579 callee_ip = %callee_ip,
580 username = cred.username,
581 server = option.server,
582 "Auto-injecting credentials from registered user for outbound call (IP match)"
583 );
584 return Some(cred.clone());
585 }
586 }
587 }
588 }
589 }
590 }
591 None
592 }
593
594 pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
595 {
596 let mut handles = self.registration_handles.lock().await;
597 for (_, cancel_token) in handles.drain() {
598 cancel_token.cancel();
599 }
600 }
601
602 if let Some(duration) = wait_for_clear {
603 let live_users = self.alive_users.clone();
604 let check_loop = async move {
605 loop {
606 let is_empty = {
607 let users = live_users
608 .read()
609 .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
610 users.is_empty()
611 };
612 if is_empty {
613 break;
614 }
615 tokio::time::sleep(Duration::from_millis(50)).await;
616 }
617 Ok::<(), anyhow::Error>(())
618 };
619 match tokio::time::timeout(duration, check_loop).await {
620 Ok(_) => {}
621 Err(e) => {
622 warn!("failed to wait for clear: {}", e);
623 return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
624 }
625 }
626 }
627 Ok(())
628 }
629
630 pub async fn register(&self, option: RegisterOption) -> Result<()> {
631 let user = option.aor();
632 let mut server = option.server.clone();
633 if !server.starts_with("sip:") && !server.starts_with("sips:") {
634 server = format!("sip:{}", server);
635 }
636 let sip_server = match rsipstack::rsip::Uri::try_from(server) {
637 Ok(uri) => uri,
638 Err(e) => {
639 warn!("failed to parse server: {} {:?}", e, option.server);
640 return Err(anyhow::anyhow!("failed to parse server: {}", e));
641 }
642 };
643 let cancel_token = self.token.child_token();
644 let credential = option.credential.clone().map(|c| c.into());
645 let registration = rsipstack::dialog::registration::Registration::new(
646 self.endpoint.inner.clone(),
647 credential,
648 );
649 let mut handle = RegistrationHandle {
650 registration,
651 option,
652 cancel_token: cancel_token.clone(),
653 start_time: Instant::now(),
654 last_update: Instant::now(),
655 last_response: None,
656 };
657 self.registration_handles
658 .lock()
659 .await
660 .insert(user.clone(), cancel_token);
661 tracing::debug!(user = user.as_str(), "starting registration task");
662 let alive_users = self.alive_users.clone();
663
664 crate::spawn(async move {
665 handle.start_time = Instant::now();
666 let cancel_token = handle.cancel_token.clone();
667 let addrs = handle.registration.endpoint.get_addrs();
668 let local_bind_addr = if let Some(addr) = find_local_addr_for_uri(&addrs, &sip_server) {
669 addr
670 } else {
671 warn!(
672 user = user.as_str(),
673 server = %sip_server,
674 "failed to get local bind address for registration transport"
675 );
676 alive_users.write().unwrap().remove(&user);
677 return;
678 };
679 let user = handle.option.aor();
680 alive_users.write().unwrap().remove(&user);
681 let mut contact_address = local_bind_addr.addr.clone();
682 let mut contact = build_contact(
683 &local_bind_addr,
684 Some(contact_address.clone()),
685 Some(handle.option.username.as_str()),
686 None,
687 );
688 let mut should_register = true;
689 let mut timer = pending().boxed();
690
691 loop {
692 select! {
693 _ = cancel_token.cancelled() => {
694 break;
695 }
696 _ = timer.as_mut(), if !should_register => {
697 should_register = true;
698 timer = Box::pin(pending());
699 }
700 result = handle.do_register(&sip_server, None, &contact), if should_register => {
701 match result {
702 Ok((expires, new_addr)) => {
703 if handle
704 .should_retry_registration_now(
705 &local_bind_addr,
706 &contact_address,
707 new_addr.as_ref(),
708 )
709 {
710 if let Some(next_contact_address) = new_addr {
711 info!(
712 user = user.as_str(),
713 current_contact = %contact_address,
714 next_contact = %next_contact_address,
715 "public address changed, retrying registration immediately",
716 );
717 contact_address = next_contact_address;
718 contact = build_contact(
719 &local_bind_addr,
720 Some(contact_address.clone()),
721 Some(handle.option.username.as_str()),
722 None,
723 );
724 continue;
725 }
726 }
727 info!(
728 user = user.as_str(),
729 expires = expires,
730 contact = %contact_address,
731 alive_users = alive_users.read().unwrap().len(),
732 "registration refreshed",
733 );
734 alive_users.write().unwrap().insert(user.clone());
735 should_register = false;
736 timer = Box::pin(tokio::time::sleep(Duration::from_secs(
737 (expires * 3 / 4) as u64,
738 )));
739 }
740 Err(e) => {
741 warn!(
742 user = user.as_str(),
743 alive_users = alive_users.read().unwrap().len(),
744 "registration failed: {:?}", e
745 );
746 should_register = false;
747 timer = Box::pin(tokio::time::sleep(Duration::from_secs(60)));
748 }
749 }
750 }
751 }
752 }
753 handle
754 .do_register(&sip_server, Some(0), &contact)
755 .await
756 .ok();
757 alive_users.write().unwrap().remove(&user);
758 });
759 Ok(())
760 }
761}
762
763impl Drop for AppStateInner {
764 fn drop(&mut self) {
765 self.stop();
766 }
767}
768
769impl AppStateBuilder {
770 pub fn new() -> Self {
771 Self {
772 config: None,
773 stream_engine: None,
774 callrecord_sender: None,
775 callrecord_formatter: None,
776 cancel_token: None,
777 create_invitation_handler: None,
778 config_path: None,
779 message_inspector: None,
780 target_locator: None,
781 transport_inspector: None,
782 }
783 }
784
785 pub fn with_config(mut self, config: Config) -> Self {
786 self.config = Some(config);
787 self
788 }
789
790 pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
791 self.stream_engine = Some(stream_engine);
792 self
793 }
794
795 pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
796 self.callrecord_sender = Some(sender);
797 self
798 }
799
800 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
801 self.cancel_token = Some(token);
802 self
803 }
804
805 pub fn with_config_metadata(mut self, path: Option<String>) -> Self {
806 self.config_path = path;
807 self
808 }
809
810 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
811 self.message_inspector = Some(inspector);
812 self
813 }
814 pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
815 self.target_locator = Some(locator);
816 self
817 }
818
819 pub fn with_transport_inspector(
820 &mut self,
821 inspector: Box<dyn TransportEventInspector>,
822 ) -> &mut Self {
823 self.transport_inspector = Some(inspector);
824 self
825 }
826
827 pub async fn build(self) -> Result<AppState> {
828 let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
829 let token = self
830 .cancel_token
831 .unwrap_or_else(|| CancellationToken::new());
832 let _ = set_cache_dir(&config.media_cache_path);
833 let local_ip = if !config.addr.is_empty() {
834 std::net::IpAddr::from_str(config.addr.as_str())?
835 } else {
836 crate::net_tool::get_first_non_loopback_interface()?
837 };
838 let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
839 let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
840
841 #[cfg(unix)]
843 let std_socket = {
844 use socket2::{Domain, Protocol, SockAddr, Socket, Type};
845
846 let domain = if local_addr.is_ipv4() {
847 Domain::IPV4
848 } else {
849 Domain::IPV6
850 };
851 let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
852 .map_err(|err| anyhow::anyhow!("Failed to create UDP socket: {}", err))?;
853
854 socket
855 .set_reuse_address(true)
856 .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEADDR: {}", err))?;
857
858 #[cfg(not(any(target_os = "solaris", target_os = "illumos", target_os = "cygwin")))]
860 socket
861 .set_reuse_port(true)
862 .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEPORT: {}", err))?;
863
864 socket
865 .bind(&SockAddr::from(local_addr))
866 .map_err(|err| anyhow::anyhow!("Failed to bind UDP socket: {}", err))?;
867
868 let std_socket: std::net::UdpSocket = socket.into();
869 std_socket
870 };
871
872 #[cfg(not(unix))]
873 let std_socket = std::net::UdpSocket::bind(local_addr)?;
874
875 std_socket.set_nonblocking(true)?;
876 let tokio_socket = tokio::net::UdpSocket::from_std(std_socket)?;
877 let actual_addr = tokio_socket.local_addr()?;
879 let bind_addr = rsipstack::transport::SipConnection::resolve_bind_address(actual_addr);
880 let mut learned_public_address: SharedPublicAddress =
881 Arc::new(ArcSwap::from_pointee(bind_addr.into()));
882
883 let udp_inner = rsipstack::transport::udp::UdpInner {
884 conn: tokio_socket,
885 addr: rsipstack::transport::SipAddr {
886 r#type: Some(rsipstack::rsip::transport::Transport::Udp),
887 addr: bind_addr.into(),
888 },
889 };
890
891 let external = config
892 .external_ip
893 .as_ref()
894 .map(|ip| {
895 format!("{}:{}", ip, actual_addr.port())
896 .parse()
897 .map_err(|e| anyhow::anyhow!("Failed to parse external address: {}", e))
898 })
899 .transpose()?;
900
901 let udp_conn = rsipstack::transport::udp::UdpConnection::attach(
902 udp_inner,
903 external,
904 Some(token.child_token()),
905 )
906 .await;
907
908 info!(
909 "start useragent, addr: {} (SO_REUSEPORT enabled)",
910 udp_conn.get_addr()
911 );
912
913 transport_layer.add_transport(udp_conn.into());
914
915 if let Some(tls_port) = config.tls_port {
917 let tls_addr: std::net::SocketAddr = format!("{}:{}", local_ip, tls_port).parse()?;
918 let tls_sip_addr = rsipstack::transport::SipAddr {
919 r#type: Some(rsipstack::rsip::transport::Transport::Tls),
920 addr: tls_addr.into(),
921 };
922 let mut tls_cfg = rsipstack::transport::tls::TlsConfig::default();
923 if let Some(ref cert_path) = config.tls_cert_file {
924 tls_cfg.cert = Some(
925 std::fs::read(cert_path)
926 .map_err(|e| anyhow::anyhow!("tls_cert_file: {}", e))?,
927 );
928 }
929 if let Some(ref key_path) = config.tls_key_file {
930 tls_cfg.key = Some(
931 std::fs::read(key_path).map_err(|e| anyhow::anyhow!("tls_key_file: {}", e))?,
932 );
933 }
934 let external_tls_addr = config
935 .external_ip
936 .as_ref()
937 .and_then(|ip| format!("{}:{}", ip, tls_port).parse().ok());
938 match rsipstack::transport::tls::TlsListenerConnection::new(
939 tls_sip_addr,
940 external_tls_addr,
941 tls_cfg,
942 )
943 .await
944 {
945 Ok(tls_conn) => {
946 transport_layer.add_transport(tls_conn.into());
947 info!("TLS SIP transport started on {}:{}", local_ip, tls_port);
948 }
949 Err(e) => {
950 return Err(anyhow::anyhow!("Failed to start TLS SIP transport: {}", e));
951 }
952 }
953 }
954
955 let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
956 let mut endpoint_builder = rsipstack::EndpointBuilder::new();
957 if let Some(ref user_agent) = config.useragent {
958 endpoint_builder.with_user_agent(user_agent.as_str());
959 }
960
961 let mut endpoint_builder = endpoint_builder
962 .with_cancel_token(token.child_token())
963 .with_transport_layer(transport_layer)
964 .with_option(endpoint_option);
965
966 if config.auto_learn_public_address.unwrap_or_default() {
967 let inspector = LearningMessageInspector::new(bind_addr.into(), self.message_inspector);
968 learned_public_address = inspector.shared_public_address();
969 endpoint_builder = endpoint_builder.with_inspector(Box::new(inspector));
970 } else if let Some(inspector) = self.message_inspector {
971 endpoint_builder = endpoint_builder.with_inspector(inspector);
972 }
973
974 if let Some(locator) = self.target_locator {
975 endpoint_builder.with_target_locator(locator);
976 } else if let Some(ref rules) = config.rewrites {
977 endpoint_builder
978 .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
979 }
980
981 if let Some(inspector) = self.transport_inspector {
982 endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
983 }
984
985 let endpoint = endpoint_builder.build();
986 let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
987
988 let stream_engine = self.stream_engine.unwrap_or_default();
989
990 let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
991 formatter
992 } else {
993 let formatter = if let Some(ref callrecord) = config.callrecord {
994 DefaultCallRecordFormatter::new_with_config(callrecord)
995 } else {
996 DefaultCallRecordFormatter::default()
997 };
998 Arc::new(formatter)
999 };
1000
1001 let callrecord_sender = if let Some(sender) = self.callrecord_sender {
1002 Some(sender)
1003 } else if let Some(ref callrecord) = config.callrecord {
1004 let builder = CallRecordManagerBuilder::new()
1005 .with_cancel_token(token.child_token())
1006 .with_config(callrecord.clone())
1007 .with_max_concurrent(32)
1008 .with_formatter(callrecord_formatter.clone());
1009
1010 let mut callrecord_manager = builder.build();
1011 let sender = callrecord_manager.sender.clone();
1012 crate::spawn(async move {
1013 callrecord_manager.serve().await;
1014 });
1015 Some(sender)
1016 } else {
1017 None
1018 };
1019
1020 let app_state = Arc::new(AppStateInner {
1021 config,
1022 token,
1023 stream_engine,
1024 callrecord_sender,
1025 endpoint,
1026 registration_handles: Mutex::new(HashMap::new()),
1027 alive_users: Arc::new(RwLock::new(HashSet::new())),
1028 dialog_layer: dialog_layer.clone(),
1029 create_invitation_handler: self.create_invitation_handler,
1030 invitation: Invitation::new(dialog_layer),
1031 routing_state: Arc::new(crate::call::RoutingState::new()),
1032 pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
1033 learned_public_address,
1034 active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
1035 total_calls: AtomicU64::new(0),
1036 total_failed_calls: AtomicU64::new(0),
1037 uptime: Local::now(),
1038 shutting_down: Arc::new(AtomicBool::new(false)),
1039 });
1040
1041 Ok(app_state)
1042 }
1043}