Skip to main content

active_call/
app.rs

1use crate::{
2    call::{ActiveCallRef, sip::Invitation},
3    callrecord::{
4        CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5    },
6    config::Config,
7    locator::RewriteTargetLocator,
8    useragent::{
9        RegisterOption,
10        invitation::{
11            FnCreateInvitationHandler, PendingDialog, PendingDialogGuard,
12            default_create_invite_handler,
13        },
14        registration::{RegistrationHandle, UserCredential},
15    },
16};
17
18use crate::media::{cache::set_cache_dir, engine::StreamEngine};
19use anyhow::Result;
20use chrono::{DateTime, Local};
21use humantime::parse_duration;
22use rsip::prelude::HeadersExt;
23use rsipstack::transaction::{
24    Endpoint, TransactionReceiver,
25    endpoint::{TargetLocator, TransportEventInspector},
26};
27use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
28use std::collections::HashSet;
29use std::str::FromStr;
30use std::sync::{Arc, RwLock};
31use std::time::Duration;
32use std::{collections::HashMap, net::SocketAddr};
33use std::{
34    path::Path,
35    sync::atomic::{AtomicBool, AtomicU64, Ordering},
36};
37use tokio::select;
38use tokio::sync::Mutex;
39use tokio_util::sync::CancellationToken;
40use tracing::{info, warn};
41
42pub struct AppStateInner {
43    pub config: Arc<Config>,
44    pub token: CancellationToken,
45    pub stream_engine: Arc<StreamEngine>,
46    pub callrecord_sender: Option<CallRecordSender>,
47    pub endpoint: Endpoint,
48    pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
49    pub alive_users: Arc<RwLock<HashSet<String>>>,
50    pub dialog_layer: Arc<DialogLayer>,
51    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
52    pub invitation: Invitation,
53    pub routing_state: Arc<crate::call::RoutingState>,
54    pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
55    pub pending_params: Arc<Mutex<HashMap<String, HashMap<String, serde_json::Value>>>>,
56
57    pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
58    pub total_calls: AtomicU64,
59    pub total_failed_calls: AtomicU64,
60    pub uptime: DateTime<Local>,
61    pub shutting_down: Arc<AtomicBool>,
62}
63
64pub type AppState = Arc<AppStateInner>;
65
66pub struct AppStateBuilder {
67    pub config: Option<Config>,
68    pub stream_engine: Option<Arc<StreamEngine>>,
69    pub callrecord_sender: Option<CallRecordSender>,
70    pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
71    pub cancel_token: Option<CancellationToken>,
72    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
73    pub config_path: Option<String>,
74
75    pub message_inspector: Option<Box<dyn MessageInspector>>,
76    pub target_locator: Option<Box<dyn TargetLocator>>,
77    pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
78}
79
80impl AppStateInner {
81    pub fn get_dump_events_file(&self, session_id: &String) -> String {
82        let recorder_root = self.config.recorder_path();
83        let root = Path::new(&recorder_root);
84        if !root.exists() {
85            match std::fs::create_dir_all(root) {
86                Ok(_) => {
87                    info!("created dump events root: {}", root.to_string_lossy());
88                }
89                Err(e) => {
90                    warn!(
91                        "Failed to create dump events root: {} {}",
92                        e,
93                        root.to_string_lossy()
94                    );
95                }
96            }
97        }
98        root.join(format!("{}.events.jsonl", session_id))
99            .to_string_lossy()
100            .to_string()
101    }
102
103    pub fn get_recorder_file(&self, session_id: &String) -> String {
104        let recorder_root = self.config.recorder_path();
105        let root = Path::new(&recorder_root);
106        if !root.exists() {
107            match std::fs::create_dir_all(root) {
108                Ok(_) => {
109                    info!("created recorder root: {}", root.to_string_lossy());
110                }
111                Err(e) => {
112                    warn!(
113                        "Failed to create recorder root: {} {}",
114                        e,
115                        root.to_string_lossy()
116                    );
117                }
118            }
119        }
120        let desired_ext = self.config.recorder_format().extension();
121        let mut filename = session_id.clone();
122        if !filename
123            .to_lowercase()
124            .ends_with(&format!(".{}", desired_ext.to_lowercase()))
125        {
126            filename = format!("{}.{}", filename, desired_ext);
127        }
128        root.join(filename).to_string_lossy().to_string()
129    }
130
131    pub async fn serve(self: Arc<Self>) -> Result<()> {
132        let incoming_txs = self.endpoint.incoming_transactions()?;
133        let token = self.token.child_token();
134        let endpoint_inner = self.endpoint.inner.clone();
135        let dialog_layer = self.dialog_layer.clone();
136        let app_state_clone = self.clone();
137
138        match self.start_registration().await {
139            Ok(count) => {
140                info!("registration started, count: {}", count);
141            }
142            Err(e) => {
143                warn!("failed to start registration: {:?}", e);
144            }
145        }
146
147        tokio::select! {
148            _ = token.cancelled() => {
149                info!("cancelled");
150            }
151            result = endpoint_inner.serve() => {
152                if let Err(e) = result {
153                    info!("endpoint serve error: {:?}", e);
154                }
155            }
156            result = app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
157                if let Err(e) = result {
158                    info!("process incoming request error: {:?}", e);
159                }
160            },
161        }
162
163        // Wait for registration to stop, if not stopped within 50 seconds,
164        // force stop it.
165        let timeout = self
166            .config
167            .graceful_shutdown
168            .map(|_| Duration::from_secs(50));
169
170        match self.stop_registration(timeout).await {
171            Ok(_) => {
172                info!("registration stopped, waiting for clear");
173            }
174            Err(e) => {
175                warn!("failed to stop registration: {:?}", e);
176            }
177        }
178        info!("stopping");
179        Ok(())
180    }
181
182    async fn process_incoming_request(
183        self: Arc<Self>,
184        dialog_layer: Arc<DialogLayer>,
185        mut incoming: TransactionReceiver,
186    ) -> Result<()> {
187        while let Some(mut tx) = incoming.recv().await {
188            let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
189            info!(?key, "received transaction");
190            if tx.original.to_header()?.tag()?.as_ref().is_some() {
191                match dialog_layer.match_dialog(&tx) {
192                    Some(mut d) => {
193                        crate::spawn(async move {
194                            match d.handle(&mut tx).await {
195                                Ok(_) => (),
196                                Err(e) => {
197                                    info!("error handling transaction: {:?}", e);
198                                }
199                            }
200                        });
201                        continue;
202                    }
203                    None => {
204                        info!("dialog not found: {}", tx.original);
205                        match tx
206                            .reply(rsip::StatusCode::CallTransactionDoesNotExist)
207                            .await
208                        {
209                            Ok(_) => (),
210                            Err(e) => {
211                                info!("error replying to request: {:?}", e);
212                            }
213                        }
214                        continue;
215                    }
216                }
217            }
218            // out dialog, new server dialog
219            let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
220            match tx.original.method {
221                rsip::Method::Invite | rsip::Method::Ack => {
222                    // Reject new INVITEs during graceful shutdown
223                    if self.shutting_down.load(Ordering::Relaxed) {
224                        info!(?key, "rejecting INVITE during graceful shutdown");
225                        match tx
226                            .reply_with(
227                                rsip::StatusCode::ServiceUnavailable,
228                                vec![rsip::Header::Other(
229                                    "Reason".into(),
230                                    "SIP;cause=503;text=\"Server shutting down\"".into(),
231                                )],
232                                None,
233                            )
234                            .await
235                        {
236                            Ok(_) => (),
237                            Err(e) => {
238                                info!("error replying to request: {:?}", e);
239                            }
240                        }
241                        continue;
242                    }
243
244                    let invitation_handler = match self.create_invitation_handler {
245                        Some(ref create_invitation_handler) => {
246                            create_invitation_handler(self.config.handler.as_ref()).ok()
247                        }
248                        _ => default_create_invite_handler(
249                            self.config.handler.as_ref(),
250                            Some(self.clone()),
251                        ),
252                    };
253                    let invitation_handler = match invitation_handler {
254                        Some(h) => h,
255                        None => {
256                            info!(?key, "no invite handler configured, rejecting INVITE");
257                            match tx
258                                .reply_with(
259                                    rsip::StatusCode::ServiceUnavailable,
260                                    vec![rsip::Header::Other(
261                                        "Reason".into(),
262                                        "SIP;cause=503;text=\"No invite handler configured\""
263                                            .into(),
264                                    )],
265                                    None,
266                                )
267                                .await
268                            {
269                                Ok(_) => (),
270                                Err(e) => {
271                                    info!("error replying to request: {:?}", e);
272                                }
273                            }
274                            continue;
275                        }
276                    };
277                    let contact = dialog_layer
278                        .endpoint
279                        .get_addrs()
280                        .first()
281                        .map(|addr| rsip::Uri {
282                            scheme: Some(rsip::Scheme::Sip),
283                            auth: None,
284                            host_with_port: addr.addr.clone(),
285                            params: vec![],
286                            headers: vec![],
287                        });
288
289                    let dialog = match dialog_layer.get_or_create_server_invite(
290                        &tx,
291                        state_sender,
292                        None,
293                        contact,
294                    ) {
295                        Ok(d) => d,
296                        Err(e) => {
297                            // 481 Dialog/Transaction Does Not Exist
298                            info!("failed to obtain dialog: {:?}", e);
299                            match tx
300                                .reply(rsip::StatusCode::CallTransactionDoesNotExist)
301                                .await
302                            {
303                                Ok(_) => (),
304                                Err(e) => {
305                                    info!("error replying to request: {:?}", e);
306                                }
307                            }
308                            continue;
309                        }
310                    };
311
312                    let dialog_id = dialog.id();
313                    let dialog_id_str = dialog_id.to_string();
314                    let token = self.token.child_token();
315                    let pending_dialog = PendingDialog {
316                        token: token.clone(),
317                        dialog: dialog.clone(),
318                        state_receiver,
319                    };
320
321                    let guard = Arc::new(PendingDialogGuard::new(
322                        self.invitation.clone(),
323                        dialog_id,
324                        pending_dialog,
325                    ));
326
327                    let accept_timeout = self
328                        .config
329                        .accept_timeout
330                        .as_ref()
331                        .and_then(|t| parse_duration(t).ok())
332                        .unwrap_or_else(|| Duration::from_secs(60));
333
334                    let token_ref = token.clone();
335                    let guard_ref = guard.clone();
336                    crate::spawn(async move {
337                        select! {
338                            _ = token_ref.cancelled() => {}
339                            _ = tokio::time::sleep(accept_timeout) => {}
340                        }
341                        guard_ref.drop_async().await;
342                    });
343
344                    let mut dialog_ref = dialog.clone();
345                    let token_ref = token.clone();
346                    let routing_state = self.routing_state.clone();
347                    let dialog_for_reject = dialog.clone();
348                    let guard_ref = guard.clone();
349                    crate::spawn(async move {
350                        let invite_loop = async {
351                            match invitation_handler
352                                .on_invite(
353                                    dialog_id_str.clone(),
354                                    token.clone(),
355                                    dialog.clone(),
356                                    routing_state,
357                                )
358                                .await
359                            {
360                                Ok(_) => (),
361                                Err(e) => {
362                                    // Webhook failed, reject the call immediately
363                                    info!(id = dialog_id_str, "error handling invite: {:?}", e);
364                                    let reason = format!("Failed to process invite: {}", e);
365                                    if let Err(reject_err) = dialog_for_reject.reject(
366                                        Some(rsip::StatusCode::ServiceUnavailable),
367                                        Some(reason),
368                                    ) {
369                                        info!(
370                                            id = dialog_id_str,
371                                            "error rejecting call: {:?}", reject_err
372                                        );
373                                    }
374                                    // Cancel token to stop dialog handling
375                                    token.cancel();
376                                    guard_ref.drop_async().await;
377                                }
378                            }
379                        };
380                        select! {
381                            _ = token_ref.cancelled() => {}
382                            _ = async {
383                                let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
384                             } => {}
385                        }
386                    });
387                }
388                rsip::Method::Options => {
389                    info!(?key, "ignoring out-of-dialog OPTIONS request");
390                    continue;
391                }
392                _ => {
393                    info!(?key, "received request: {:?}", tx.original.method);
394                    match tx.reply(rsip::StatusCode::OK).await {
395                        Ok(_) => (),
396                        Err(e) => {
397                            info!("error replying to request: {:?}", e);
398                        }
399                    }
400                }
401            }
402        }
403        Ok(())
404    }
405
406    pub fn stop(&self) {
407        info!("stopping, marking as shutting down");
408        self.shutting_down.store(true, Ordering::Relaxed);
409        self.token.cancel();
410    }
411
412    pub async fn start_registration(&self) -> Result<usize> {
413        let mut count = 0;
414        if let Some(register_users) = &self.config.register_users {
415            for option in register_users.iter() {
416                match self.register(option.clone()).await {
417                    Ok(_) => {
418                        count += 1;
419                    }
420                    Err(e) => {
421                        warn!("failed to register user: {:?} {:?}", e, option);
422                    }
423                }
424            }
425        }
426        Ok(count)
427    }
428
429    pub fn find_credentials_for_callee(&self, callee: &str) -> Option<UserCredential> {
430        let callee_uri = callee
431            .strip_prefix("sip:")
432            .or_else(|| callee.strip_prefix("sips:"))
433            .unwrap_or(callee);
434        let callee_uri = if !callee_uri.starts_with("sip:") && !callee_uri.starts_with("sips:") {
435            format!("sip:{}", callee_uri)
436        } else {
437            callee_uri.to_string()
438        };
439
440        let parsed_callee = match rsip::Uri::try_from(callee_uri.as_str()) {
441            Ok(uri) => uri,
442            Err(e) => {
443                warn!("failed to parse callee URI: {} {:?}", callee, e);
444                return None;
445            }
446        };
447
448        let callee_host = match &parsed_callee.host_with_port.host {
449            rsip::Host::Domain(domain) => domain.to_string(),
450            rsip::Host::IpAddr(ip) => return self.find_credentials_by_ip(ip),
451        };
452
453        // Look through registered users to find one matching this domain
454        if let Some(register_users) = &self.config.register_users {
455            for option in register_users.iter() {
456                let mut server = option.server.clone();
457                if !server.starts_with("sip:") && !server.starts_with("sips:") {
458                    server = format!("sip:{}", server);
459                }
460
461                let parsed_server = match rsip::Uri::try_from(server.as_str()) {
462                    Ok(uri) => uri,
463                    Err(e) => {
464                        warn!("failed to parse server URI: {} {:?}", option.server, e);
465                        continue;
466                    }
467                };
468
469                let server_host = match &parsed_server.host_with_port.host {
470                    rsip::Host::Domain(domain) => domain.to_string(),
471                    rsip::Host::IpAddr(ip) => {
472                        // Compare IP addresses
473                        if let rsip::Host::IpAddr(callee_ip) = &parsed_callee.host_with_port.host {
474                            if ip == callee_ip {
475                                if let Some(cred) = &option.credential {
476                                    info!(
477                                        callee,
478                                        username = cred.username,
479                                        server = option.server,
480                                        "Auto-injecting credentials from registered user for outbound call (IP match)"
481                                    );
482                                    return Some(cred.clone());
483                                }
484                            }
485                        }
486                        continue;
487                    }
488                };
489
490                if server_host == callee_host {
491                    if let Some(cred) = &option.credential {
492                        info!(
493                            callee,
494                            username = cred.username,
495                            server = option.server,
496                            "Auto-injecting credentials from registered user for outbound call"
497                        );
498                        return Some(cred.clone());
499                    }
500                }
501            }
502        }
503
504        None
505    }
506
507    /// Helper function to find credentials by IP address
508    fn find_credentials_by_ip(
509        &self,
510        callee_ip: &std::net::IpAddr,
511    ) -> Option<crate::useragent::registration::UserCredential> {
512        if let Some(register_users) = &self.config.register_users {
513            for option in register_users.iter() {
514                let mut server = option.server.clone();
515                if !server.starts_with("sip:") && !server.starts_with("sips:") {
516                    server = format!("sip:{}", server);
517                }
518
519                if let Ok(parsed_server) = rsip::Uri::try_from(server.as_str()) {
520                    if let rsip::Host::IpAddr(server_ip) = &parsed_server.host_with_port.host {
521                        if server_ip == callee_ip {
522                            if let Some(cred) = &option.credential {
523                                info!(
524                                    callee_ip = %callee_ip,
525                                    username = cred.username,
526                                    server = option.server,
527                                    "Auto-injecting credentials from registered user for outbound call (IP match)"
528                                );
529                                return Some(cred.clone());
530                            }
531                        }
532                    }
533                }
534            }
535        }
536        None
537    }
538
539    pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
540        for (_, handle) in self.registration_handles.lock().await.iter_mut() {
541            handle.stop();
542        }
543
544        if let Some(duration) = wait_for_clear {
545            let live_users = self.alive_users.clone();
546            let check_loop = async move {
547                loop {
548                    let is_empty = {
549                        let users = live_users
550                            .read()
551                            .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
552                        users.is_empty()
553                    };
554                    if is_empty {
555                        break;
556                    }
557                    tokio::time::sleep(Duration::from_millis(50)).await;
558                }
559                Ok::<(), anyhow::Error>(())
560            };
561            match tokio::time::timeout(duration, check_loop).await {
562                Ok(_) => {}
563                Err(e) => {
564                    warn!("failed to wait for clear: {}", e);
565                    return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
566                }
567            }
568        }
569        Ok(())
570    }
571
572    pub async fn register(&self, option: RegisterOption) -> Result<()> {
573        let user = option.aor();
574        let mut server = option.server.clone();
575        if !server.starts_with("sip:") && !server.starts_with("sips:") {
576            server = format!("sip:{}", server);
577        }
578        let sip_server = match rsip::Uri::try_from(server) {
579            Ok(uri) => uri,
580            Err(e) => {
581                warn!("failed to parse server: {} {:?}", e, option.server);
582                return Err(anyhow::anyhow!("failed to parse server: {}", e));
583            }
584        };
585        let cancel_token = self.token.child_token();
586        let handle = RegistrationHandle {
587            inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
588                endpoint_inner: self.endpoint.inner.clone(),
589                option,
590                cancel_token,
591                start_time: Mutex::new(std::time::Instant::now()),
592                last_update: Mutex::new(std::time::Instant::now()),
593                last_response: Mutex::new(None),
594            }),
595        };
596        self.registration_handles
597            .lock()
598            .await
599            .insert(user.clone(), handle.clone());
600        tracing::debug!(user = user.as_str(), "starting registration task");
601        let alive_users = self.alive_users.clone();
602
603        crate::spawn(async move {
604            *handle.inner.start_time.lock().await = std::time::Instant::now();
605
606            select! {
607                _ = handle.inner.cancel_token.cancelled() => {
608                }
609                _ = async {
610                    loop {
611                        let user = handle.inner.option.aor();
612                        alive_users.write().unwrap().remove(&user);
613                        let refresh_time = match handle.do_register(&sip_server, None).await {
614                            Ok(expires) => {
615                                info!(
616                                    user = handle.inner.option.aor(),
617                                    expires = expires,
618                                    alive_users = alive_users.read().unwrap().len(),
619                                    "registration refreshed",
620                                );
621                                alive_users.write().unwrap().insert(user);
622                                expires * 3 / 4 // 75% of expiration time
623                            }
624                            Err(e) => {
625                                warn!(
626                                    user = handle.inner.option.aor(),
627                                    alive_users = alive_users.read().unwrap().len(),
628                                    "registration failed: {:?}", e);
629                                60
630                            }
631                        };
632                        tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
633                    }
634                } => {}
635            }
636            handle.do_register(&sip_server, Some(0)).await.ok();
637            alive_users.write().unwrap().remove(&user);
638        });
639        Ok(())
640    }
641}
642
643impl Drop for AppStateInner {
644    fn drop(&mut self) {
645        self.stop();
646    }
647}
648
649impl AppStateBuilder {
650    pub fn new() -> Self {
651        Self {
652            config: None,
653            stream_engine: None,
654            callrecord_sender: None,
655            callrecord_formatter: None,
656            cancel_token: None,
657            create_invitation_handler: None,
658            config_path: None,
659            message_inspector: None,
660            target_locator: None,
661            transport_inspector: None,
662        }
663    }
664
665    pub fn with_config(mut self, config: Config) -> Self {
666        self.config = Some(config);
667        self
668    }
669
670    pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
671        self.stream_engine = Some(stream_engine);
672        self
673    }
674
675    pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
676        self.callrecord_sender = Some(sender);
677        self
678    }
679
680    pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
681        self.cancel_token = Some(token);
682        self
683    }
684
685    pub fn with_config_metadata(mut self, path: Option<String>) -> Self {
686        self.config_path = path;
687        self
688    }
689
690    pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
691        self.message_inspector = Some(inspector);
692        self
693    }
694    pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
695        self.target_locator = Some(locator);
696        self
697    }
698
699    pub fn with_transport_inspector(
700        &mut self,
701        inspector: Box<dyn TransportEventInspector>,
702    ) -> &mut Self {
703        self.transport_inspector = Some(inspector);
704        self
705    }
706
707    pub async fn build(self) -> Result<AppState> {
708        let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
709        let token = self
710            .cancel_token
711            .unwrap_or_else(|| CancellationToken::new());
712        let _ = set_cache_dir(&config.media_cache_path);
713
714        let local_ip = if !config.addr.is_empty() {
715            std::net::IpAddr::from_str(config.addr.as_str())?
716        } else {
717            crate::net_tool::get_first_non_loopback_interface()?
718        };
719        let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
720        let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
721
722        // Create UDP socket with SO_REUSEPORT for graceful restarts
723        #[cfg(unix)]
724        let std_socket = {
725            use socket2::{Domain, Protocol, SockAddr, Socket, Type};
726
727            let domain = if local_addr.is_ipv4() {
728                Domain::IPV4
729            } else {
730                Domain::IPV6
731            };
732            let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
733                .map_err(|err| anyhow::anyhow!("Failed to create UDP socket: {}", err))?;
734
735            socket
736                .set_reuse_address(true)
737                .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEADDR: {}", err))?;
738
739            // SO_REUSEPORT (Linux/BSD)
740            #[cfg(not(any(target_os = "solaris", target_os = "illumos", target_os = "cygwin")))]
741            socket
742                .set_reuse_port(true)
743                .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEPORT: {}", err))?;
744
745            socket
746                .bind(&SockAddr::from(local_addr))
747                .map_err(|err| anyhow::anyhow!("Failed to bind UDP socket: {}", err))?;
748
749            let std_socket: std::net::UdpSocket = socket.into();
750            std_socket
751        };
752
753        #[cfg(not(unix))]
754        let std_socket = std::net::UdpSocket::bind(local_addr)?;
755
756        std_socket.set_nonblocking(true)?;
757        let tokio_socket = tokio::net::UdpSocket::from_std(std_socket)?;
758        // Use the actual bound address (important when port=0 lets OS assign a port)
759        let actual_addr = tokio_socket.local_addr()?;
760
761        let udp_inner = rsipstack::transport::udp::UdpInner {
762            conn: tokio_socket,
763            addr: rsipstack::transport::SipAddr {
764                r#type: Some(rsip::transport::Transport::Udp),
765                addr: actual_addr.into(),
766            },
767        };
768
769        let udp_conn = rsipstack::transport::udp::UdpConnection::attach(
770            udp_inner,
771            None,
772            Some(token.child_token()),
773        )
774        .await;
775
776        transport_layer.add_transport(udp_conn.into());
777        info!(
778            "start useragent, addr: {} (SO_REUSEPORT enabled)",
779            local_addr
780        );
781
782        // Optional SIP over TLS transport
783        if let Some(tls_port) = config.tls_port {
784            let tls_addr: std::net::SocketAddr = format!("{}:{}", local_ip, tls_port).parse()?;
785            let tls_sip_addr = rsipstack::transport::SipAddr {
786                r#type: Some(rsip::transport::Transport::Tls),
787                addr: tls_addr.into(),
788            };
789            let mut tls_cfg = rsipstack::transport::tls::TlsConfig::default();
790            if let Some(ref cert_path) = config.tls_cert_file {
791                tls_cfg.cert = Some(
792                    std::fs::read(cert_path)
793                        .map_err(|e| anyhow::anyhow!("tls_cert_file: {}", e))?,
794                );
795            }
796            if let Some(ref key_path) = config.tls_key_file {
797                tls_cfg.key = Some(
798                    std::fs::read(key_path).map_err(|e| anyhow::anyhow!("tls_key_file: {}", e))?,
799                );
800            }
801            let external_tls_addr = config
802                .external_ip
803                .as_ref()
804                .and_then(|ip| format!("{}:{}", ip, tls_port).parse().ok());
805            match rsipstack::transport::tls::TlsListenerConnection::new(
806                tls_sip_addr,
807                external_tls_addr,
808                tls_cfg,
809            )
810            .await
811            {
812                Ok(tls_conn) => {
813                    transport_layer.add_transport(tls_conn.into());
814                    info!("TLS SIP transport started on {}:{}", local_ip, tls_port);
815                }
816                Err(e) => {
817                    return Err(anyhow::anyhow!("Failed to start TLS SIP transport: {}", e));
818                }
819            }
820        }
821
822        let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
823        let mut endpoint_builder = rsipstack::EndpointBuilder::new();
824        if let Some(ref user_agent) = config.useragent {
825            endpoint_builder.with_user_agent(user_agent.as_str());
826        }
827
828        let mut endpoint_builder = endpoint_builder
829            .with_cancel_token(token.child_token())
830            .with_transport_layer(transport_layer)
831            .with_option(endpoint_option);
832
833        if let Some(inspector) = self.message_inspector {
834            endpoint_builder = endpoint_builder.with_inspector(inspector);
835        }
836
837        if let Some(locator) = self.target_locator {
838            endpoint_builder.with_target_locator(locator);
839        } else if let Some(ref rules) = config.rewrites {
840            endpoint_builder
841                .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
842        }
843
844        if let Some(inspector) = self.transport_inspector {
845            endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
846        }
847
848        let endpoint = endpoint_builder.build();
849        let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
850
851        let stream_engine = self.stream_engine.unwrap_or_default();
852
853        let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
854            formatter
855        } else {
856            let formatter = if let Some(ref callrecord) = config.callrecord {
857                DefaultCallRecordFormatter::new_with_config(callrecord)
858            } else {
859                DefaultCallRecordFormatter::default()
860            };
861            Arc::new(formatter)
862        };
863
864        let callrecord_sender = if let Some(sender) = self.callrecord_sender {
865            Some(sender)
866        } else if let Some(ref callrecord) = config.callrecord {
867            let builder = CallRecordManagerBuilder::new()
868                .with_cancel_token(token.child_token())
869                .with_config(callrecord.clone())
870                .with_max_concurrent(32)
871                .with_formatter(callrecord_formatter.clone());
872
873            let mut callrecord_manager = builder.build();
874            let sender = callrecord_manager.sender.clone();
875            crate::spawn(async move {
876                callrecord_manager.serve().await;
877            });
878            Some(sender)
879        } else {
880            None
881        };
882
883        let app_state = Arc::new(AppStateInner {
884            config,
885            token,
886            stream_engine,
887            callrecord_sender,
888            endpoint,
889            registration_handles: Mutex::new(HashMap::new()),
890            alive_users: Arc::new(RwLock::new(HashSet::new())),
891            dialog_layer: dialog_layer.clone(),
892            create_invitation_handler: self.create_invitation_handler,
893            invitation: Invitation::new(dialog_layer),
894            routing_state: Arc::new(crate::call::RoutingState::new()),
895            pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
896            pending_params: Arc::new(Mutex::new(HashMap::new())),
897            active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
898            total_calls: AtomicU64::new(0),
899            total_failed_calls: AtomicU64::new(0),
900            uptime: Local::now(),
901            shutting_down: Arc::new(AtomicBool::new(false)),
902        });
903
904        Ok(app_state)
905    }
906}