Skip to main content

active_call/
app.rs

1use crate::{
2    call::{ActiveCallRef, sip::Invitation},
3    callrecord::{
4        CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5    },
6    config::Config,
7    locator::RewriteTargetLocator,
8    useragent::{
9        RegisterOption,
10        invitation::{
11            FnCreateInvitationHandler, PendingDialog, PendingDialogGuard,
12            default_create_invite_handler,
13        },
14        registration::{RegistrationHandle, UserCredential},
15    },
16};
17
18use crate::media::{cache::set_cache_dir, engine::StreamEngine};
19use anyhow::Result;
20use chrono::{DateTime, Local};
21use humantime::parse_duration;
22use rsip::prelude::HeadersExt;
23use rsipstack::transaction::{
24    Endpoint, TransactionReceiver,
25    endpoint::{TargetLocator, TransportEventInspector},
26};
27use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
28use std::collections::HashSet;
29use std::str::FromStr;
30use std::sync::{Arc, RwLock};
31use std::time::Duration;
32use std::{collections::HashMap, net::SocketAddr};
33use std::{
34    path::Path,
35    sync::atomic::{AtomicBool, AtomicU64, Ordering},
36};
37use tokio::select;
38use tokio::sync::Mutex;
39use tokio_util::sync::CancellationToken;
40use tracing::{info, warn};
41
42pub struct AppStateInner {
43    pub config: Arc<Config>,
44    pub token: CancellationToken,
45    pub stream_engine: Arc<StreamEngine>,
46    pub callrecord_sender: Option<CallRecordSender>,
47    pub endpoint: Endpoint,
48    pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
49    pub alive_users: Arc<RwLock<HashSet<String>>>,
50    pub dialog_layer: Arc<DialogLayer>,
51    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
52    pub invitation: Invitation,
53    pub routing_state: Arc<crate::call::RoutingState>,
54    pub pending_playbooks: Arc<Mutex<HashMap<String, (String, std::time::Instant)>>>,
55
56    pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
57    pub total_calls: AtomicU64,
58    pub total_failed_calls: AtomicU64,
59    pub uptime: DateTime<Local>,
60    pub shutting_down: Arc<AtomicBool>,
61}
62
63pub type AppState = Arc<AppStateInner>;
64
65pub struct AppStateBuilder {
66    pub config: Option<Config>,
67    pub stream_engine: Option<Arc<StreamEngine>>,
68    pub callrecord_sender: Option<CallRecordSender>,
69    pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
70    pub cancel_token: Option<CancellationToken>,
71    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
72    pub config_path: Option<String>,
73
74    pub message_inspector: Option<Box<dyn MessageInspector>>,
75    pub target_locator: Option<Box<dyn TargetLocator>>,
76    pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
77}
78
79impl AppStateInner {
80    pub fn get_dump_events_file(&self, session_id: &String) -> String {
81        let recorder_root = self.config.recorder_path();
82        let root = Path::new(&recorder_root);
83        if !root.exists() {
84            match std::fs::create_dir_all(root) {
85                Ok(_) => {
86                    info!("created dump events root: {}", root.to_string_lossy());
87                }
88                Err(e) => {
89                    warn!(
90                        "Failed to create dump events root: {} {}",
91                        e,
92                        root.to_string_lossy()
93                    );
94                }
95            }
96        }
97        root.join(format!("{}.events.jsonl", session_id))
98            .to_string_lossy()
99            .to_string()
100    }
101
102    pub fn get_recorder_file(&self, session_id: &String) -> String {
103        let recorder_root = self.config.recorder_path();
104        let root = Path::new(&recorder_root);
105        if !root.exists() {
106            match std::fs::create_dir_all(root) {
107                Ok(_) => {
108                    info!("created recorder root: {}", root.to_string_lossy());
109                }
110                Err(e) => {
111                    warn!(
112                        "Failed to create recorder root: {} {}",
113                        e,
114                        root.to_string_lossy()
115                    );
116                }
117            }
118        }
119        let desired_ext = self.config.recorder_format().extension();
120        let mut filename = session_id.clone();
121        if !filename
122            .to_lowercase()
123            .ends_with(&format!(".{}", desired_ext.to_lowercase()))
124        {
125            filename = format!("{}.{}", filename, desired_ext);
126        }
127        root.join(filename).to_string_lossy().to_string()
128    }
129
130    pub async fn serve(self: Arc<Self>) -> Result<()> {
131        let incoming_txs = self.endpoint.incoming_transactions()?;
132        let token = self.token.child_token();
133        let endpoint_inner = self.endpoint.inner.clone();
134        let dialog_layer = self.dialog_layer.clone();
135        let app_state_clone = self.clone();
136
137        match self.start_registration().await {
138            Ok(count) => {
139                info!("registration started, count: {}", count);
140            }
141            Err(e) => {
142                warn!("failed to start registration: {:?}", e);
143            }
144        }
145
146        let pending_cleanup_state = self.clone();
147        let pending_cleanup_token = token.clone();
148        crate::spawn(async move {
149            let mut interval = tokio::time::interval(Duration::from_secs(60));
150            let ttl = Duration::from_secs(300);
151            loop {
152                tokio::select! {
153                    _ = pending_cleanup_token.cancelled() => break,
154                    _ = interval.tick() => {
155                        let mut pending = pending_cleanup_state.pending_playbooks.lock().await;
156                        let before = pending.len();
157                        pending.retain(|_, (_, created_at)| created_at.elapsed() < ttl);
158                        let removed = before - pending.len();
159                        if removed > 0 {
160                            info!(removed, remaining = pending.len(), "cleaned up stale pending_playbooks entries");
161                        }
162                    }
163                }
164            }
165        });
166
167        tokio::select! {
168            _ = token.cancelled() => {
169                info!("cancelled");
170            }
171            result = endpoint_inner.serve() => {
172                if let Err(e) = result {
173                    info!("endpoint serve error: {:?}", e);
174                }
175            }
176            result = app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
177                if let Err(e) = result {
178                    info!("process incoming request error: {:?}", e);
179                }
180            },
181        }
182
183        // Wait for registration to stop, if not stopped within 50 seconds,
184        // force stop it.
185        let timeout = self
186            .config
187            .graceful_shutdown
188            .map(|_| Duration::from_secs(50));
189
190        match self.stop_registration(timeout).await {
191            Ok(_) => {
192                info!("registration stopped, waiting for clear");
193            }
194            Err(e) => {
195                warn!("failed to stop registration: {:?}", e);
196            }
197        }
198        info!("stopping");
199        Ok(())
200    }
201
202    async fn process_incoming_request(
203        self: Arc<Self>,
204        dialog_layer: Arc<DialogLayer>,
205        mut incoming: TransactionReceiver,
206    ) -> Result<()> {
207        while let Some(mut tx) = incoming.recv().await {
208            let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
209            info!(?key, "received transaction");
210            if tx.original.to_header()?.tag()?.as_ref().is_some() {
211                match dialog_layer.match_dialog(&tx) {
212                    Some(mut d) => {
213                        crate::spawn(async move {
214                            match d.handle(&mut tx).await {
215                                Ok(_) => (),
216                                Err(e) => {
217                                    info!("error handling transaction: {:?}", e);
218                                }
219                            }
220                        });
221                        continue;
222                    }
223                    None => {
224                        info!("dialog not found: {}", tx.original);
225                        match tx
226                            .reply(rsip::StatusCode::CallTransactionDoesNotExist)
227                            .await
228                        {
229                            Ok(_) => (),
230                            Err(e) => {
231                                info!("error replying to request: {:?}", e);
232                            }
233                        }
234                        continue;
235                    }
236                }
237            }
238            // out dialog, new server dialog
239            let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
240            match tx.original.method {
241                rsip::Method::Invite | rsip::Method::Ack => {
242                    // Reject new INVITEs during graceful shutdown
243                    if self.shutting_down.load(Ordering::Relaxed) {
244                        info!(?key, "rejecting INVITE during graceful shutdown");
245                        match tx
246                            .reply_with(
247                                rsip::StatusCode::ServiceUnavailable,
248                                vec![rsip::Header::Other(
249                                    "Reason".into(),
250                                    "SIP;cause=503;text=\"Server shutting down\"".into(),
251                                )],
252                                None,
253                            )
254                            .await
255                        {
256                            Ok(_) => (),
257                            Err(e) => {
258                                info!("error replying to request: {:?}", e);
259                            }
260                        }
261                        continue;
262                    }
263
264                    let invitation_handler = match self.create_invitation_handler {
265                        Some(ref create_invitation_handler) => {
266                            create_invitation_handler(self.config.handler.as_ref()).ok()
267                        }
268                        _ => default_create_invite_handler(
269                            self.config.handler.as_ref(),
270                            Some(self.clone()),
271                        ),
272                    };
273                    let invitation_handler = match invitation_handler {
274                        Some(h) => h,
275                        None => {
276                            info!(?key, "no invite handler configured, rejecting INVITE");
277                            match tx
278                                .reply_with(
279                                    rsip::StatusCode::ServiceUnavailable,
280                                    vec![rsip::Header::Other(
281                                        "Reason".into(),
282                                        "SIP;cause=503;text=\"No invite handler configured\""
283                                            .into(),
284                                    )],
285                                    None,
286                                )
287                                .await
288                            {
289                                Ok(_) => (),
290                                Err(e) => {
291                                    info!("error replying to request: {:?}", e);
292                                }
293                            }
294                            continue;
295                        }
296                    };
297                    let contact = dialog_layer
298                        .endpoint
299                        .get_addrs()
300                        .first()
301                        .map(|addr| rsip::Uri {
302                            scheme: Some(rsip::Scheme::Sip),
303                            auth: None,
304                            host_with_port: addr.addr.clone(),
305                            params: vec![],
306                            headers: vec![],
307                        });
308
309                    let dialog = match dialog_layer.get_or_create_server_invite(
310                        &tx,
311                        state_sender,
312                        None,
313                        contact,
314                    ) {
315                        Ok(d) => d,
316                        Err(e) => {
317                            // 481 Dialog/Transaction Does Not Exist
318                            info!("failed to obtain dialog: {:?}", e);
319                            match tx
320                                .reply(rsip::StatusCode::CallTransactionDoesNotExist)
321                                .await
322                            {
323                                Ok(_) => (),
324                                Err(e) => {
325                                    info!("error replying to request: {:?}", e);
326                                }
327                            }
328                            continue;
329                        }
330                    };
331
332                    let dialog_id = dialog.id();
333                    let dialog_id_str = dialog_id.to_string();
334                    let token = self.token.child_token();
335                    let pending_dialog = PendingDialog {
336                        token: token.clone(),
337                        dialog: dialog.clone(),
338                        state_receiver,
339                    };
340
341                    let guard = Arc::new(PendingDialogGuard::new(
342                        self.invitation.clone(),
343                        dialog_id,
344                        pending_dialog,
345                    ));
346
347                    let accept_timeout = self
348                        .config
349                        .accept_timeout
350                        .as_ref()
351                        .and_then(|t| parse_duration(t).ok())
352                        .unwrap_or_else(|| Duration::from_secs(60));
353
354                    let token_ref = token.clone();
355                    let guard_ref = guard.clone();
356                    crate::spawn(async move {
357                        select! {
358                            _ = token_ref.cancelled() => {}
359                            _ = tokio::time::sleep(accept_timeout) => {}
360                        }
361                        guard_ref.drop_async().await;
362                    });
363
364                    let mut dialog_ref = dialog.clone();
365                    let token_ref = token.clone();
366                    let routing_state = self.routing_state.clone();
367                    let dialog_for_reject = dialog.clone();
368                    let guard_ref = guard.clone();
369                    crate::spawn(async move {
370                        let invite_loop = async {
371                            match invitation_handler
372                                .on_invite(
373                                    dialog_id_str.clone(),
374                                    token.clone(),
375                                    dialog.clone(),
376                                    routing_state,
377                                )
378                                .await
379                            {
380                                Ok(_) => (),
381                                Err(e) => {
382                                    // Webhook failed, reject the call immediately
383                                    info!(id = dialog_id_str, "error handling invite: {:?}", e);
384                                    let reason = format!("Failed to process invite: {}", e);
385                                    if let Err(reject_err) = dialog_for_reject.reject(
386                                        Some(rsip::StatusCode::ServiceUnavailable),
387                                        Some(reason),
388                                    ) {
389                                        info!(
390                                            id = dialog_id_str,
391                                            "error rejecting call: {:?}", reject_err
392                                        );
393                                    }
394                                    // Cancel token to stop dialog handling
395                                    token.cancel();
396                                    guard_ref.drop_async().await;
397                                }
398                            }
399                        };
400                        select! {
401                            _ = token_ref.cancelled() => {}
402                            _ = async {
403                                let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
404                             } => {}
405                        }
406                    });
407                }
408                rsip::Method::Options => {
409                    info!(?key, "ignoring out-of-dialog OPTIONS request");
410                    continue;
411                }
412                _ => {
413                    info!(?key, "received request: {:?}", tx.original.method);
414                    match tx.reply(rsip::StatusCode::OK).await {
415                        Ok(_) => (),
416                        Err(e) => {
417                            info!("error replying to request: {:?}", e);
418                        }
419                    }
420                }
421            }
422        }
423        Ok(())
424    }
425
426    pub fn stop(&self) {
427        info!("stopping, marking as shutting down");
428        self.shutting_down.store(true, Ordering::Relaxed);
429        self.token.cancel();
430    }
431
432    pub async fn start_registration(&self) -> Result<usize> {
433        let mut count = 0;
434        if let Some(register_users) = &self.config.register_users {
435            for option in register_users.iter() {
436                match self.register(option.clone()).await {
437                    Ok(_) => {
438                        count += 1;
439                    }
440                    Err(e) => {
441                        warn!("failed to register user: {:?} {:?}", e, option);
442                    }
443                }
444            }
445        }
446        Ok(count)
447    }
448
449    pub fn find_credentials_for_callee(&self, callee: &str) -> Option<UserCredential> {
450        let callee_uri = callee
451            .strip_prefix("sip:")
452            .or_else(|| callee.strip_prefix("sips:"))
453            .unwrap_or(callee);
454        let callee_uri = if !callee_uri.starts_with("sip:") && !callee_uri.starts_with("sips:") {
455            format!("sip:{}", callee_uri)
456        } else {
457            callee_uri.to_string()
458        };
459
460        let parsed_callee = match rsip::Uri::try_from(callee_uri.as_str()) {
461            Ok(uri) => uri,
462            Err(e) => {
463                warn!("failed to parse callee URI: {} {:?}", callee, e);
464                return None;
465            }
466        };
467
468        let callee_host = match &parsed_callee.host_with_port.host {
469            rsip::Host::Domain(domain) => domain.to_string(),
470            rsip::Host::IpAddr(ip) => return self.find_credentials_by_ip(ip),
471        };
472
473        // Look through registered users to find one matching this domain
474        if let Some(register_users) = &self.config.register_users {
475            for option in register_users.iter() {
476                let mut server = option.server.clone();
477                if !server.starts_with("sip:") && !server.starts_with("sips:") {
478                    server = format!("sip:{}", server);
479                }
480
481                let parsed_server = match rsip::Uri::try_from(server.as_str()) {
482                    Ok(uri) => uri,
483                    Err(e) => {
484                        warn!("failed to parse server URI: {} {:?}", option.server, e);
485                        continue;
486                    }
487                };
488
489                let server_host = match &parsed_server.host_with_port.host {
490                    rsip::Host::Domain(domain) => domain.to_string(),
491                    rsip::Host::IpAddr(ip) => {
492                        // Compare IP addresses
493                        if let rsip::Host::IpAddr(callee_ip) = &parsed_callee.host_with_port.host {
494                            if ip == callee_ip {
495                                if let Some(cred) = &option.credential {
496                                    info!(
497                                        callee,
498                                        username = cred.username,
499                                        server = option.server,
500                                        "Auto-injecting credentials from registered user for outbound call (IP match)"
501                                    );
502                                    return Some(cred.clone());
503                                }
504                            }
505                        }
506                        continue;
507                    }
508                };
509
510                if server_host == callee_host {
511                    if let Some(cred) = &option.credential {
512                        info!(
513                            callee,
514                            username = cred.username,
515                            server = option.server,
516                            "Auto-injecting credentials from registered user for outbound call"
517                        );
518                        return Some(cred.clone());
519                    }
520                }
521            }
522        }
523
524        None
525    }
526
527    /// Helper function to find credentials by IP address
528    fn find_credentials_by_ip(
529        &self,
530        callee_ip: &std::net::IpAddr,
531    ) -> Option<crate::useragent::registration::UserCredential> {
532        if let Some(register_users) = &self.config.register_users {
533            for option in register_users.iter() {
534                let mut server = option.server.clone();
535                if !server.starts_with("sip:") && !server.starts_with("sips:") {
536                    server = format!("sip:{}", server);
537                }
538
539                if let Ok(parsed_server) = rsip::Uri::try_from(server.as_str()) {
540                    if let rsip::Host::IpAddr(server_ip) = &parsed_server.host_with_port.host {
541                        if server_ip == callee_ip {
542                            if let Some(cred) = &option.credential {
543                                info!(
544                                    callee_ip = %callee_ip,
545                                    username = cred.username,
546                                    server = option.server,
547                                    "Auto-injecting credentials from registered user for outbound call (IP match)"
548                                );
549                                return Some(cred.clone());
550                            }
551                        }
552                    }
553                }
554            }
555        }
556        None
557    }
558
559    pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
560        {
561            let mut handles = self.registration_handles.lock().await;
562            for (_, handle) in handles.iter_mut() {
563                handle.stop();
564            }
565            handles.clear();
566        }
567
568        if let Some(duration) = wait_for_clear {
569            let live_users = self.alive_users.clone();
570            let check_loop = async move {
571                loop {
572                    let is_empty = {
573                        let users = live_users
574                            .read()
575                            .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
576                        users.is_empty()
577                    };
578                    if is_empty {
579                        break;
580                    }
581                    tokio::time::sleep(Duration::from_millis(50)).await;
582                }
583                Ok::<(), anyhow::Error>(())
584            };
585            match tokio::time::timeout(duration, check_loop).await {
586                Ok(_) => {}
587                Err(e) => {
588                    warn!("failed to wait for clear: {}", e);
589                    return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
590                }
591            }
592        }
593        Ok(())
594    }
595
596    pub async fn register(&self, option: RegisterOption) -> Result<()> {
597        let user = option.aor();
598        let mut server = option.server.clone();
599        if !server.starts_with("sip:") && !server.starts_with("sips:") {
600            server = format!("sip:{}", server);
601        }
602        let sip_server = match rsip::Uri::try_from(server) {
603            Ok(uri) => uri,
604            Err(e) => {
605                warn!("failed to parse server: {} {:?}", e, option.server);
606                return Err(anyhow::anyhow!("failed to parse server: {}", e));
607            }
608        };
609        let cancel_token = self.token.child_token();
610        let handle = RegistrationHandle {
611            inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
612                endpoint_inner: self.endpoint.inner.clone(),
613                option,
614                cancel_token,
615                start_time: Mutex::new(std::time::Instant::now()),
616                last_update: Mutex::new(std::time::Instant::now()),
617                last_response: Mutex::new(None),
618            }),
619        };
620        self.registration_handles
621            .lock()
622            .await
623            .insert(user.clone(), handle.clone());
624        tracing::debug!(user = user.as_str(), "starting registration task");
625        let alive_users = self.alive_users.clone();
626
627        crate::spawn(async move {
628            *handle.inner.start_time.lock().await = std::time::Instant::now();
629
630            select! {
631                _ = handle.inner.cancel_token.cancelled() => {
632                }
633                _ = async {
634                    loop {
635                        let user = handle.inner.option.aor();
636                        alive_users.write().unwrap().remove(&user);
637                        let refresh_time = match handle.do_register(&sip_server, None).await {
638                            Ok(expires) => {
639                                info!(
640                                    user = handle.inner.option.aor(),
641                                    expires = expires,
642                                    alive_users = alive_users.read().unwrap().len(),
643                                    "registration refreshed",
644                                );
645                                alive_users.write().unwrap().insert(user);
646                                expires * 3 / 4 // 75% of expiration time
647                            }
648                            Err(e) => {
649                                warn!(
650                                    user = handle.inner.option.aor(),
651                                    alive_users = alive_users.read().unwrap().len(),
652                                    "registration failed: {:?}", e);
653                                60
654                            }
655                        };
656                        tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
657                    }
658                } => {}
659            }
660            handle.do_register(&sip_server, Some(0)).await.ok();
661            alive_users.write().unwrap().remove(&user);
662        });
663        Ok(())
664    }
665}
666
667impl Drop for AppStateInner {
668    fn drop(&mut self) {
669        self.stop();
670    }
671}
672
673impl AppStateBuilder {
674    pub fn new() -> Self {
675        Self {
676            config: None,
677            stream_engine: None,
678            callrecord_sender: None,
679            callrecord_formatter: None,
680            cancel_token: None,
681            create_invitation_handler: None,
682            config_path: None,
683            message_inspector: None,
684            target_locator: None,
685            transport_inspector: None,
686        }
687    }
688
689    pub fn with_config(mut self, config: Config) -> Self {
690        self.config = Some(config);
691        self
692    }
693
694    pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
695        self.stream_engine = Some(stream_engine);
696        self
697    }
698
699    pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
700        self.callrecord_sender = Some(sender);
701        self
702    }
703
704    pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
705        self.cancel_token = Some(token);
706        self
707    }
708
709    pub fn with_config_metadata(mut self, path: Option<String>) -> Self {
710        self.config_path = path;
711        self
712    }
713
714    pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
715        self.message_inspector = Some(inspector);
716        self
717    }
718    pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
719        self.target_locator = Some(locator);
720        self
721    }
722
723    pub fn with_transport_inspector(
724        &mut self,
725        inspector: Box<dyn TransportEventInspector>,
726    ) -> &mut Self {
727        self.transport_inspector = Some(inspector);
728        self
729    }
730
731    pub async fn build(self) -> Result<AppState> {
732        let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
733        let token = self
734            .cancel_token
735            .unwrap_or_else(|| CancellationToken::new());
736        let _ = set_cache_dir(&config.media_cache_path);
737
738        let local_ip = if !config.addr.is_empty() {
739            std::net::IpAddr::from_str(config.addr.as_str())?
740        } else {
741            crate::net_tool::get_first_non_loopback_interface()?
742        };
743        let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
744        let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
745
746        // Create UDP socket with SO_REUSEPORT for graceful restarts
747        #[cfg(unix)]
748        let std_socket = {
749            use socket2::{Domain, Protocol, SockAddr, Socket, Type};
750
751            let domain = if local_addr.is_ipv4() {
752                Domain::IPV4
753            } else {
754                Domain::IPV6
755            };
756            let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
757                .map_err(|err| anyhow::anyhow!("Failed to create UDP socket: {}", err))?;
758
759            socket
760                .set_reuse_address(true)
761                .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEADDR: {}", err))?;
762
763            // SO_REUSEPORT (Linux/BSD)
764            #[cfg(not(any(target_os = "solaris", target_os = "illumos", target_os = "cygwin")))]
765            socket
766                .set_reuse_port(true)
767                .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEPORT: {}", err))?;
768
769            socket
770                .bind(&SockAddr::from(local_addr))
771                .map_err(|err| anyhow::anyhow!("Failed to bind UDP socket: {}", err))?;
772
773            let std_socket: std::net::UdpSocket = socket.into();
774            std_socket
775        };
776
777        #[cfg(not(unix))]
778        let std_socket = std::net::UdpSocket::bind(local_addr)?;
779
780        std_socket.set_nonblocking(true)?;
781        let tokio_socket = tokio::net::UdpSocket::from_std(std_socket)?;
782        // Use the actual bound address (important when port=0 lets OS assign a port)
783        let actual_addr = tokio_socket.local_addr()?;
784
785        let udp_inner = rsipstack::transport::udp::UdpInner {
786            conn: tokio_socket,
787            addr: rsipstack::transport::SipAddr {
788                r#type: Some(rsip::transport::Transport::Udp),
789                addr: actual_addr.into(),
790            },
791        };
792
793        let udp_conn = rsipstack::transport::udp::UdpConnection::attach(
794            udp_inner,
795            None,
796            Some(token.child_token()),
797        )
798        .await;
799
800        transport_layer.add_transport(udp_conn.into());
801        info!(
802            "start useragent, addr: {} (SO_REUSEPORT enabled)",
803            local_addr
804        );
805
806        // Optional SIP over TLS transport
807        if let Some(tls_port) = config.tls_port {
808            let tls_addr: std::net::SocketAddr = format!("{}:{}", local_ip, tls_port).parse()?;
809            let tls_sip_addr = rsipstack::transport::SipAddr {
810                r#type: Some(rsip::transport::Transport::Tls),
811                addr: tls_addr.into(),
812            };
813            let mut tls_cfg = rsipstack::transport::tls::TlsConfig::default();
814            if let Some(ref cert_path) = config.tls_cert_file {
815                tls_cfg.cert = Some(
816                    std::fs::read(cert_path)
817                        .map_err(|e| anyhow::anyhow!("tls_cert_file: {}", e))?,
818                );
819            }
820            if let Some(ref key_path) = config.tls_key_file {
821                tls_cfg.key = Some(
822                    std::fs::read(key_path).map_err(|e| anyhow::anyhow!("tls_key_file: {}", e))?,
823                );
824            }
825            let external_tls_addr = config
826                .external_ip
827                .as_ref()
828                .and_then(|ip| format!("{}:{}", ip, tls_port).parse().ok());
829            match rsipstack::transport::tls::TlsListenerConnection::new(
830                tls_sip_addr,
831                external_tls_addr,
832                tls_cfg,
833            )
834            .await
835            {
836                Ok(tls_conn) => {
837                    transport_layer.add_transport(tls_conn.into());
838                    info!("TLS SIP transport started on {}:{}", local_ip, tls_port);
839                }
840                Err(e) => {
841                    return Err(anyhow::anyhow!("Failed to start TLS SIP transport: {}", e));
842                }
843            }
844        }
845
846        let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
847        let mut endpoint_builder = rsipstack::EndpointBuilder::new();
848        if let Some(ref user_agent) = config.useragent {
849            endpoint_builder.with_user_agent(user_agent.as_str());
850        }
851
852        let mut endpoint_builder = endpoint_builder
853            .with_cancel_token(token.child_token())
854            .with_transport_layer(transport_layer)
855            .with_option(endpoint_option);
856
857        if let Some(inspector) = self.message_inspector {
858            endpoint_builder = endpoint_builder.with_inspector(inspector);
859        }
860
861        if let Some(locator) = self.target_locator {
862            endpoint_builder.with_target_locator(locator);
863        } else if let Some(ref rules) = config.rewrites {
864            endpoint_builder
865                .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
866        }
867
868        if let Some(inspector) = self.transport_inspector {
869            endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
870        }
871
872        let endpoint = endpoint_builder.build();
873        let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
874
875        let stream_engine = self.stream_engine.unwrap_or_default();
876
877        let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
878            formatter
879        } else {
880            let formatter = if let Some(ref callrecord) = config.callrecord {
881                DefaultCallRecordFormatter::new_with_config(callrecord)
882            } else {
883                DefaultCallRecordFormatter::default()
884            };
885            Arc::new(formatter)
886        };
887
888        let callrecord_sender = if let Some(sender) = self.callrecord_sender {
889            Some(sender)
890        } else if let Some(ref callrecord) = config.callrecord {
891            let builder = CallRecordManagerBuilder::new()
892                .with_cancel_token(token.child_token())
893                .with_config(callrecord.clone())
894                .with_max_concurrent(32)
895                .with_formatter(callrecord_formatter.clone());
896
897            let mut callrecord_manager = builder.build();
898            let sender = callrecord_manager.sender.clone();
899            crate::spawn(async move {
900                callrecord_manager.serve().await;
901            });
902            Some(sender)
903        } else {
904            None
905        };
906
907        let app_state = Arc::new(AppStateInner {
908            config,
909            token,
910            stream_engine,
911            callrecord_sender,
912            endpoint,
913            registration_handles: Mutex::new(HashMap::new()),
914            alive_users: Arc::new(RwLock::new(HashSet::new())),
915            dialog_layer: dialog_layer.clone(),
916            create_invitation_handler: self.create_invitation_handler,
917            invitation: Invitation::new(dialog_layer),
918            routing_state: Arc::new(crate::call::RoutingState::new()),
919            pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
920            active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
921            total_calls: AtomicU64::new(0),
922            total_failed_calls: AtomicU64::new(0),
923            uptime: Local::now(),
924            shutting_down: Arc::new(AtomicBool::new(false)),
925        });
926
927        Ok(app_state)
928    }
929}