Skip to main content

active_call/
app.rs

1use crate::{
2    call::{ActiveCallRef, sip::Invitation},
3    callrecord::{
4        CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5    },
6    config::Config,
7    locator::RewriteTargetLocator,
8    useragent::{
9        RegisterOption,
10        invitation::{
11            FnCreateInvitationHandler, PendingDialog, PendingDialogGuard,
12            default_create_invite_handler,
13        },
14        public_address::{
15            LearnedPublicAddresses, LearningMessageInspector, build_public_contact_uri,
16        },
17        registration::{RegistrationHandle, UserCredential},
18    },
19};
20
21use crate::media::{cache::set_cache_dir, engine::StreamEngine};
22use anyhow::Result;
23use chrono::{DateTime, Local};
24use humantime::parse_duration;
25use rsip::prelude::HeadersExt;
26use rsipstack::transaction::{
27    Endpoint, TransactionReceiver,
28    endpoint::{TargetLocator, TransportEventInspector},
29};
30use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
31use std::collections::HashSet;
32use std::str::FromStr;
33use std::sync::{Arc, RwLock};
34use std::time::Duration;
35use std::{collections::HashMap, net::SocketAddr};
36use std::{
37    path::Path,
38    sync::atomic::{AtomicBool, AtomicU64, Ordering},
39};
40use tokio::select;
41use tokio::sync::Mutex;
42use tokio_util::sync::CancellationToken;
43use tracing::{info, warn};
44
45pub struct AppStateInner {
46    pub config: Arc<Config>,
47    pub token: CancellationToken,
48    pub stream_engine: Arc<StreamEngine>,
49    pub callrecord_sender: Option<CallRecordSender>,
50    pub endpoint: Endpoint,
51    pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
52    pub alive_users: Arc<RwLock<HashSet<String>>>,
53    pub dialog_layer: Arc<DialogLayer>,
54    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
55    pub invitation: Invitation,
56    pub routing_state: Arc<crate::call::RoutingState>,
57    pub pending_playbooks: Arc<Mutex<HashMap<String, (String, std::time::Instant)>>>,
58    pub learned_public_addresses: LearnedPublicAddresses,
59
60    pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
61    pub total_calls: AtomicU64,
62    pub total_failed_calls: AtomicU64,
63    pub uptime: DateTime<Local>,
64    pub shutting_down: Arc<AtomicBool>,
65}
66
67pub type AppState = Arc<AppStateInner>;
68
69pub struct AppStateBuilder {
70    pub config: Option<Config>,
71    pub stream_engine: Option<Arc<StreamEngine>>,
72    pub callrecord_sender: Option<CallRecordSender>,
73    pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
74    pub cancel_token: Option<CancellationToken>,
75    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
76    pub config_path: Option<String>,
77
78    pub message_inspector: Option<Box<dyn MessageInspector>>,
79    pub target_locator: Option<Box<dyn TargetLocator>>,
80    pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
81}
82
83impl AppStateInner {
84    pub fn auto_learn_public_address_enabled(&self) -> bool {
85        self.config.auto_learn_public_address.unwrap_or(false)
86    }
87
88    pub fn get_dump_events_file(&self, session_id: &String) -> String {
89        let recorder_root = self.config.recorder_path();
90        let root = Path::new(&recorder_root);
91        if !root.exists() {
92            match std::fs::create_dir_all(root) {
93                Ok(_) => {
94                    info!("created dump events root: {}", root.to_string_lossy());
95                }
96                Err(e) => {
97                    warn!(
98                        "Failed to create dump events root: {} {}",
99                        e,
100                        root.to_string_lossy()
101                    );
102                }
103            }
104        }
105        root.join(format!("{}.events.jsonl", session_id))
106            .to_string_lossy()
107            .to_string()
108    }
109
110    pub fn get_recorder_file(&self, session_id: &String) -> String {
111        let recorder_root = self.config.recorder_path();
112        let root = Path::new(&recorder_root);
113        if !root.exists() {
114            match std::fs::create_dir_all(root) {
115                Ok(_) => {
116                    info!("created recorder root: {}", root.to_string_lossy());
117                }
118                Err(e) => {
119                    warn!(
120                        "Failed to create recorder root: {} {}",
121                        e,
122                        root.to_string_lossy()
123                    );
124                }
125            }
126        }
127        let desired_ext = self.config.recorder_format().extension();
128        let mut filename = session_id.clone();
129        if !filename
130            .to_lowercase()
131            .ends_with(&format!(".{}", desired_ext.to_lowercase()))
132        {
133            filename = format!("{}.{}", filename, desired_ext);
134        }
135        root.join(filename).to_string_lossy().to_string()
136    }
137
138    pub async fn serve(self: Arc<Self>) -> Result<()> {
139        let incoming_txs = self.endpoint.incoming_transactions()?;
140        let token = self.token.child_token();
141        let endpoint_inner = self.endpoint.inner.clone();
142        let dialog_layer = self.dialog_layer.clone();
143        let app_state_clone = self.clone();
144
145        match self.start_registration().await {
146            Ok(count) => {
147                info!("registration started, count: {}", count);
148            }
149            Err(e) => {
150                warn!("failed to start registration: {:?}", e);
151            }
152        }
153
154        let pending_cleanup_state = self.clone();
155        let pending_cleanup_token = token.clone();
156        crate::spawn(async move {
157            let mut interval = tokio::time::interval(Duration::from_secs(60));
158            let ttl = Duration::from_secs(300);
159            loop {
160                tokio::select! {
161                    _ = pending_cleanup_token.cancelled() => break,
162                    _ = interval.tick() => {
163                        let mut pending = pending_cleanup_state.pending_playbooks.lock().await;
164                        let before = pending.len();
165                        pending.retain(|_, (_, created_at)| created_at.elapsed() < ttl);
166                        let removed = before - pending.len();
167                        if removed > 0 {
168                            info!(removed, remaining = pending.len(), "cleaned up stale pending_playbooks entries");
169                        }
170                    }
171                }
172            }
173        });
174
175        tokio::select! {
176            _ = token.cancelled() => {
177                info!("cancelled");
178            }
179            result = endpoint_inner.serve() => {
180                if let Err(e) = result {
181                    info!("endpoint serve error: {:?}", e);
182                }
183            }
184            result = app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
185                if let Err(e) = result {
186                    info!("process incoming request error: {:?}", e);
187                }
188            },
189        }
190
191        // Wait for registration to stop, if not stopped within 50 seconds,
192        // force stop it.
193        let timeout = self
194            .config
195            .graceful_shutdown
196            .map(|_| Duration::from_secs(10));
197
198        match self.stop_registration(timeout).await {
199            Ok(_) => {
200                info!("registration stopped, waiting for clear");
201            }
202            Err(e) => {
203                warn!("failed to stop registration: {:?}", e);
204            }
205        }
206        info!("stopping");
207        Ok(())
208    }
209
210    async fn process_incoming_request(
211        self: Arc<Self>,
212        dialog_layer: Arc<DialogLayer>,
213        mut incoming: TransactionReceiver,
214    ) -> Result<()> {
215        while let Some(mut tx) = incoming.recv().await {
216            let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
217            info!(?key, "received transaction");
218            if tx.original.to_header()?.tag()?.as_ref().is_some() {
219                match dialog_layer.match_dialog(&tx) {
220                    Some(mut d) => {
221                        crate::spawn(async move {
222                            match d.handle(&mut tx).await {
223                                Ok(_) => (),
224                                Err(e) => {
225                                    info!("error handling transaction: {:?}", e);
226                                }
227                            }
228                        });
229                        continue;
230                    }
231                    None => {
232                        info!("dialog not found: {}", tx.original);
233                        match tx
234                            .reply(rsip::StatusCode::CallTransactionDoesNotExist)
235                            .await
236                        {
237                            Ok(_) => (),
238                            Err(e) => {
239                                info!("error replying to request: {:?}", e);
240                            }
241                        }
242                        continue;
243                    }
244                }
245            }
246            // out dialog, new server dialog
247            let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
248            match tx.original.method {
249                rsip::Method::Invite | rsip::Method::Ack => {
250                    // Reject new INVITEs during graceful shutdown
251                    if self.shutting_down.load(Ordering::Relaxed) {
252                        info!(?key, "rejecting INVITE during graceful shutdown");
253                        match tx
254                            .reply_with(
255                                rsip::StatusCode::ServiceUnavailable,
256                                vec![rsip::Header::Other(
257                                    "Reason".into(),
258                                    "SIP;cause=503;text=\"Server shutting down\"".into(),
259                                )],
260                                None,
261                            )
262                            .await
263                        {
264                            Ok(_) => (),
265                            Err(e) => {
266                                info!("error replying to request: {:?}", e);
267                            }
268                        }
269                        continue;
270                    }
271
272                    let invitation_handler = match self.create_invitation_handler {
273                        Some(ref create_invitation_handler) => {
274                            create_invitation_handler(self.config.handler.as_ref()).ok()
275                        }
276                        _ => default_create_invite_handler(
277                            self.config.handler.as_ref(),
278                            Some(self.clone()),
279                        ),
280                    };
281                    let invitation_handler = match invitation_handler {
282                        Some(h) => h,
283                        None => {
284                            info!(?key, "no invite handler configured, rejecting INVITE");
285                            match tx
286                                .reply_with(
287                                    rsip::StatusCode::ServiceUnavailable,
288                                    vec![rsip::Header::Other(
289                                        "Reason".into(),
290                                        "SIP;cause=503;text=\"No invite handler configured\""
291                                            .into(),
292                                    )],
293                                    None,
294                                )
295                                .await
296                            {
297                                Ok(_) => (),
298                                Err(e) => {
299                                    info!("error replying to request: {:?}", e);
300                                }
301                            }
302                            continue;
303                        }
304                    };
305                    let local_addr = tx
306                        .connection
307                        .as_ref()
308                        .map(|connection| connection.get_addr().clone())
309                        .or_else(|| dialog_layer.endpoint.get_addrs().first().cloned());
310                    let contact_username =
311                        tx.original.uri.auth.as_ref().map(|auth| auth.user.as_str());
312                    let contact = local_addr.as_ref().map(|addr| {
313                        build_public_contact_uri(
314                            &self.learned_public_addresses,
315                            self.auto_learn_public_address_enabled(),
316                            addr,
317                            contact_username,
318                            None,
319                        )
320                    });
321
322                    let dialog = match dialog_layer.get_or_create_server_invite(
323                        &tx,
324                        state_sender,
325                        None,
326                        contact,
327                    ) {
328                        Ok(d) => d,
329                        Err(e) => {
330                            // 481 Dialog/Transaction Does Not Exist
331                            info!("failed to obtain dialog: {:?}", e);
332                            match tx
333                                .reply(rsip::StatusCode::CallTransactionDoesNotExist)
334                                .await
335                            {
336                                Ok(_) => (),
337                                Err(e) => {
338                                    info!("error replying to request: {:?}", e);
339                                }
340                            }
341                            continue;
342                        }
343                    };
344
345                    let dialog_id = dialog.id();
346                    let dialog_id_str = dialog_id.to_string();
347                    let token = self.token.child_token();
348                    let pending_dialog = PendingDialog {
349                        token: token.clone(),
350                        dialog: dialog.clone(),
351                        state_receiver,
352                    };
353
354                    let guard = Arc::new(PendingDialogGuard::new(
355                        self.invitation.clone(),
356                        dialog_id,
357                        pending_dialog,
358                    ));
359
360                    let accept_timeout = self
361                        .config
362                        .accept_timeout
363                        .as_ref()
364                        .and_then(|t| parse_duration(t).ok())
365                        .unwrap_or_else(|| Duration::from_secs(60));
366
367                    let token_ref = token.clone();
368                    let guard_ref = guard.clone();
369                    crate::spawn(async move {
370                        select! {
371                            _ = token_ref.cancelled() => {}
372                            _ = tokio::time::sleep(accept_timeout) => {}
373                        }
374                        guard_ref.drop_async().await;
375                    });
376
377                    let mut dialog_ref = dialog.clone();
378                    let token_ref = token.clone();
379                    let routing_state = self.routing_state.clone();
380                    let dialog_for_reject = dialog.clone();
381                    let guard_ref = guard.clone();
382                    crate::spawn(async move {
383                        let invite_loop = async {
384                            match invitation_handler
385                                .on_invite(
386                                    dialog_id_str.clone(),
387                                    token.clone(),
388                                    dialog.clone(),
389                                    routing_state,
390                                )
391                                .await
392                            {
393                                Ok(_) => (),
394                                Err(e) => {
395                                    // Webhook failed, reject the call immediately
396                                    info!(id = dialog_id_str, "error handling invite: {:?}", e);
397                                    let reason = format!("Failed to process invite: {}", e);
398                                    if let Err(reject_err) = dialog_for_reject.reject(
399                                        Some(rsip::StatusCode::ServiceUnavailable),
400                                        Some(reason),
401                                    ) {
402                                        info!(
403                                            id = dialog_id_str,
404                                            "error rejecting call: {:?}", reject_err
405                                        );
406                                    }
407                                    // Cancel token to stop dialog handling
408                                    token.cancel();
409                                    guard_ref.drop_async().await;
410                                }
411                            }
412                        };
413                        select! {
414                            _ = token_ref.cancelled() => {}
415                            _ = async {
416                                let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
417                             } => {}
418                        }
419                    });
420                }
421                rsip::Method::Options => {
422                    info!(?key, "ignoring out-of-dialog OPTIONS request");
423                    continue;
424                }
425                _ => {
426                    info!(?key, "received request: {:?}", tx.original.method);
427                    match tx.reply(rsip::StatusCode::OK).await {
428                        Ok(_) => (),
429                        Err(e) => {
430                            info!("error replying to request: {:?}", e);
431                        }
432                    }
433                }
434            }
435        }
436        Ok(())
437    }
438
439    pub fn stop(&self) {
440        if self.shutting_down.swap(true, Ordering::Relaxed) {
441            return;
442        }
443        info!("stopping, marking as shutting down");
444        self.token.cancel();
445    }
446    
447    pub async fn graceful_stop(&self) -> Result<()> {
448        if self.shutting_down.swap(true, Ordering::Relaxed) {
449            return Ok(());
450        }
451        
452        info!("graceful stopping, marking as shutting down");
453        let timeout = self
454            .config
455            .graceful_shutdown
456            .map(|_| Duration::from_secs(10));
457
458        self.stop_registration(timeout).await?;
459        self.token.cancel();
460        Ok(())
461    }
462
463    pub async fn start_registration(&self) -> Result<usize> {
464        let mut count = 0;
465        if let Some(register_users) = &self.config.register_users {
466            for option in register_users.iter() {
467                match self.register(option.clone()).await {
468                    Ok(_) => {
469                        count += 1;
470                    }
471                    Err(e) => {
472                        warn!("failed to register user: {:?} {:?}", e, option);
473                    }
474                }
475            }
476        }
477        Ok(count)
478    }
479
480    pub fn find_credentials_for_callee(&self, callee: &str) -> Option<UserCredential> {
481        let callee_uri = callee
482            .strip_prefix("sip:")
483            .or_else(|| callee.strip_prefix("sips:"))
484            .unwrap_or(callee);
485        let callee_uri = if !callee_uri.starts_with("sip:") && !callee_uri.starts_with("sips:") {
486            format!("sip:{}", callee_uri)
487        } else {
488            callee_uri.to_string()
489        };
490
491        let parsed_callee = match rsip::Uri::try_from(callee_uri.as_str()) {
492            Ok(uri) => uri,
493            Err(e) => {
494                warn!("failed to parse callee URI: {} {:?}", callee, e);
495                return None;
496            }
497        };
498
499        let callee_host = match &parsed_callee.host_with_port.host {
500            rsip::Host::Domain(domain) => domain.to_string(),
501            rsip::Host::IpAddr(ip) => return self.find_credentials_by_ip(ip),
502        };
503
504        // Look through registered users to find one matching this domain
505        if let Some(register_users) = &self.config.register_users {
506            for option in register_users.iter() {
507                let mut server = option.server.clone();
508                if !server.starts_with("sip:") && !server.starts_with("sips:") {
509                    server = format!("sip:{}", server);
510                }
511
512                let parsed_server = match rsip::Uri::try_from(server.as_str()) {
513                    Ok(uri) => uri,
514                    Err(e) => {
515                        warn!("failed to parse server URI: {} {:?}", option.server, e);
516                        continue;
517                    }
518                };
519
520                let server_host = match &parsed_server.host_with_port.host {
521                    rsip::Host::Domain(domain) => domain.to_string(),
522                    rsip::Host::IpAddr(ip) => {
523                        // Compare IP addresses
524                        if let rsip::Host::IpAddr(callee_ip) = &parsed_callee.host_with_port.host {
525                            if ip == callee_ip {
526                                if let Some(cred) = &option.credential {
527                                    info!(
528                                        callee,
529                                        username = cred.username,
530                                        server = option.server,
531                                        "Auto-injecting credentials from registered user for outbound call (IP match)"
532                                    );
533                                    return Some(cred.clone());
534                                }
535                            }
536                        }
537                        continue;
538                    }
539                };
540
541                if server_host == callee_host {
542                    if let Some(cred) = &option.credential {
543                        info!(
544                            callee,
545                            username = cred.username,
546                            server = option.server,
547                            "Auto-injecting credentials from registered user for outbound call"
548                        );
549                        return Some(cred.clone());
550                    }
551                }
552            }
553        }
554
555        None
556    }
557
558    /// Helper function to find credentials by IP address
559    fn find_credentials_by_ip(
560        &self,
561        callee_ip: &std::net::IpAddr,
562    ) -> Option<crate::useragent::registration::UserCredential> {
563        if let Some(register_users) = &self.config.register_users {
564            for option in register_users.iter() {
565                let mut server = option.server.clone();
566                if !server.starts_with("sip:") && !server.starts_with("sips:") {
567                    server = format!("sip:{}", server);
568                }
569
570                if let Ok(parsed_server) = rsip::Uri::try_from(server.as_str()) {
571                    if let rsip::Host::IpAddr(server_ip) = &parsed_server.host_with_port.host {
572                        if server_ip == callee_ip {
573                            if let Some(cred) = &option.credential {
574                                info!(
575                                    callee_ip = %callee_ip,
576                                    username = cred.username,
577                                    server = option.server,
578                                    "Auto-injecting credentials from registered user for outbound call (IP match)"
579                                );
580                                return Some(cred.clone());
581                            }
582                        }
583                    }
584                }
585            }
586        }
587        None
588    }
589
590    pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
591        {
592            let mut handles = self.registration_handles.lock().await;
593            for (_, handle) in handles.iter_mut() {
594                handle.stop();
595            }
596            handles.clear();
597        }
598
599        if let Some(duration) = wait_for_clear {
600            let live_users = self.alive_users.clone();
601            let check_loop = async move {
602                loop {
603                    let is_empty = {
604                        let users = live_users
605                            .read()
606                            .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
607                        users.is_empty()
608                    };
609                    if is_empty {
610                        break;
611                    }
612                    tokio::time::sleep(Duration::from_millis(50)).await;
613                }
614                Ok::<(), anyhow::Error>(())
615            };
616            match tokio::time::timeout(duration, check_loop).await {
617                Ok(_) => {}
618                Err(e) => {
619                    warn!("failed to wait for clear: {}", e);
620                    return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
621                }
622            }
623        }
624        Ok(())
625    }
626
627    pub async fn register(&self, option: RegisterOption) -> Result<()> {
628        let user = option.aor();
629        let mut server = option.server.clone();
630        if !server.starts_with("sip:") && !server.starts_with("sips:") {
631            server = format!("sip:{}", server);
632        }
633        let sip_server = match rsip::Uri::try_from(server) {
634            Ok(uri) => uri,
635            Err(e) => {
636                warn!("failed to parse server: {} {:?}", e, option.server);
637                return Err(anyhow::anyhow!("failed to parse server: {}", e));
638            }
639        };
640        let cancel_token = self.token.child_token();
641        let credential = option.credential.clone().map(|c| c.into());
642        let registration = rsipstack::dialog::registration::Registration::new(
643            self.endpoint.inner.clone(),
644            credential,
645        );
646        let handle = RegistrationHandle {
647            inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
648                registration: Mutex::new(registration),
649                option,
650                cancel_token,
651                start_time: Mutex::new(std::time::Instant::now()),
652                last_update: Mutex::new(std::time::Instant::now()),
653                last_response: Mutex::new(None),
654            }),
655        };
656        self.registration_handles
657            .lock()
658            .await
659            .insert(user.clone(), handle.clone());
660        tracing::debug!(user = user.as_str(), "starting registration task");
661        let alive_users = self.alive_users.clone();
662
663        crate::spawn(async move {
664            *handle.inner.start_time.lock().await = std::time::Instant::now();
665
666            select! {
667                _ = handle.inner.cancel_token.cancelled() => {
668                }
669                _ = async {
670                    loop {
671                        let user = handle.inner.option.aor();
672                        alive_users.write().unwrap().remove(&user);
673                        let refresh_time = match handle.do_register(&sip_server, None).await {
674                            Ok(expires) => {
675                                info!(
676                                    user = handle.inner.option.aor(),
677                                    expires = expires,
678                                    alive_users = alive_users.read().unwrap().len(),
679                                    "registration refreshed",
680                                );
681                                alive_users.write().unwrap().insert(user);
682                                expires * 3 / 4 // 75% of expiration time
683                            }
684                            Err(e) => {
685                                warn!(
686                                    user = handle.inner.option.aor(),
687                                    alive_users = alive_users.read().unwrap().len(),
688                                    "registration failed: {:?}", e);
689                                60
690                            }
691                        };
692                        tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
693                    }
694                } => {}
695            }
696            handle.do_register(&sip_server, Some(0)).await.ok();
697            alive_users.write().unwrap().remove(&user);
698        });
699        Ok(())
700    }
701}
702
703impl Drop for AppStateInner {
704    fn drop(&mut self) {
705        self.stop();
706    }
707}
708
709impl AppStateBuilder {
710    pub fn new() -> Self {
711        Self {
712            config: None,
713            stream_engine: None,
714            callrecord_sender: None,
715            callrecord_formatter: None,
716            cancel_token: None,
717            create_invitation_handler: None,
718            config_path: None,
719            message_inspector: None,
720            target_locator: None,
721            transport_inspector: None,
722        }
723    }
724
725    pub fn with_config(mut self, config: Config) -> Self {
726        self.config = Some(config);
727        self
728    }
729
730    pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
731        self.stream_engine = Some(stream_engine);
732        self
733    }
734
735    pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
736        self.callrecord_sender = Some(sender);
737        self
738    }
739
740    pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
741        self.cancel_token = Some(token);
742        self
743    }
744
745    pub fn with_config_metadata(mut self, path: Option<String>) -> Self {
746        self.config_path = path;
747        self
748    }
749
750    pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
751        self.message_inspector = Some(inspector);
752        self
753    }
754    pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
755        self.target_locator = Some(locator);
756        self
757    }
758
759    pub fn with_transport_inspector(
760        &mut self,
761        inspector: Box<dyn TransportEventInspector>,
762    ) -> &mut Self {
763        self.transport_inspector = Some(inspector);
764        self
765    }
766
767    pub async fn build(self) -> Result<AppState> {
768        let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
769        let token = self
770            .cancel_token
771            .unwrap_or_else(|| CancellationToken::new());
772        let _ = set_cache_dir(&config.media_cache_path);
773        let learned_public_addresses = LearnedPublicAddresses::default();
774
775        let local_ip = if !config.addr.is_empty() {
776            std::net::IpAddr::from_str(config.addr.as_str())?
777        } else {
778            crate::net_tool::get_first_non_loopback_interface()?
779        };
780        let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
781        let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
782
783        // Create UDP socket with SO_REUSEPORT for graceful restarts
784        #[cfg(unix)]
785        let std_socket = {
786            use socket2::{Domain, Protocol, SockAddr, Socket, Type};
787
788            let domain = if local_addr.is_ipv4() {
789                Domain::IPV4
790            } else {
791                Domain::IPV6
792            };
793            let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
794                .map_err(|err| anyhow::anyhow!("Failed to create UDP socket: {}", err))?;
795
796            socket
797                .set_reuse_address(true)
798                .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEADDR: {}", err))?;
799
800            // SO_REUSEPORT (Linux/BSD)
801            #[cfg(not(any(target_os = "solaris", target_os = "illumos", target_os = "cygwin")))]
802            socket
803                .set_reuse_port(true)
804                .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEPORT: {}", err))?;
805
806            socket
807                .bind(&SockAddr::from(local_addr))
808                .map_err(|err| anyhow::anyhow!("Failed to bind UDP socket: {}", err))?;
809
810            let std_socket: std::net::UdpSocket = socket.into();
811            std_socket
812        };
813
814        #[cfg(not(unix))]
815        let std_socket = std::net::UdpSocket::bind(local_addr)?;
816
817        std_socket.set_nonblocking(true)?;
818        let tokio_socket = tokio::net::UdpSocket::from_std(std_socket)?;
819        // Use the actual bound address (important when port=0 lets OS assign a port)
820        let actual_addr = tokio_socket.local_addr()?;
821        let bind_addr = rsipstack::transport::SipConnection::resolve_bind_address(actual_addr);
822
823        let udp_inner = rsipstack::transport::udp::UdpInner {
824            conn: tokio_socket,
825            addr: rsipstack::transport::SipAddr {
826                r#type: Some(rsip::transport::Transport::Udp),
827                addr: bind_addr.into(),
828            },
829        };
830
831        let external = config
832            .external_ip
833            .as_ref()
834            .map(|ip| {
835                format!("{}:{}", ip, actual_addr.port())
836                    .parse()
837                    .map_err(|e| anyhow::anyhow!("Failed to parse external address: {}", e))
838            })
839            .transpose()?;
840
841        let udp_conn = rsipstack::transport::udp::UdpConnection::attach(
842            udp_inner,
843            external,
844            Some(token.child_token()),
845        )
846        .await;
847
848        info!(
849            "start useragent, addr: {} (SO_REUSEPORT enabled)",
850            udp_conn.get_addr()
851        );
852
853        transport_layer.add_transport(udp_conn.into());
854
855        // Optional SIP over TLS transport
856        if let Some(tls_port) = config.tls_port {
857            let tls_addr: std::net::SocketAddr = format!("{}:{}", local_ip, tls_port).parse()?;
858            let tls_sip_addr = rsipstack::transport::SipAddr {
859                r#type: Some(rsip::transport::Transport::Tls),
860                addr: tls_addr.into(),
861            };
862            let mut tls_cfg = rsipstack::transport::tls::TlsConfig::default();
863            if let Some(ref cert_path) = config.tls_cert_file {
864                tls_cfg.cert = Some(
865                    std::fs::read(cert_path)
866                        .map_err(|e| anyhow::anyhow!("tls_cert_file: {}", e))?,
867                );
868            }
869            if let Some(ref key_path) = config.tls_key_file {
870                tls_cfg.key = Some(
871                    std::fs::read(key_path).map_err(|e| anyhow::anyhow!("tls_key_file: {}", e))?,
872                );
873            }
874            let external_tls_addr = config
875                .external_ip
876                .as_ref()
877                .and_then(|ip| format!("{}:{}", ip, tls_port).parse().ok());
878            match rsipstack::transport::tls::TlsListenerConnection::new(
879                tls_sip_addr,
880                external_tls_addr,
881                tls_cfg,
882            )
883            .await
884            {
885                Ok(tls_conn) => {
886                    transport_layer.add_transport(tls_conn.into());
887                    info!("TLS SIP transport started on {}:{}", local_ip, tls_port);
888                }
889                Err(e) => {
890                    return Err(anyhow::anyhow!("Failed to start TLS SIP transport: {}", e));
891                }
892            }
893        }
894
895        let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
896        let mut endpoint_builder = rsipstack::EndpointBuilder::new();
897        if let Some(ref user_agent) = config.useragent {
898            endpoint_builder.with_user_agent(user_agent.as_str());
899        }
900
901        let mut endpoint_builder = endpoint_builder
902            .with_cancel_token(token.child_token())
903            .with_transport_layer(transport_layer)
904            .with_option(endpoint_option);
905
906        if config.auto_learn_public_address.unwrap_or(false) {
907            endpoint_builder =
908                endpoint_builder.with_inspector(Box::new(LearningMessageInspector::new(
909                    learned_public_addresses.clone(),
910                    self.message_inspector,
911                )));
912        } else if let Some(inspector) = self.message_inspector {
913            endpoint_builder = endpoint_builder.with_inspector(inspector);
914        }
915
916        if let Some(locator) = self.target_locator {
917            endpoint_builder.with_target_locator(locator);
918        } else if let Some(ref rules) = config.rewrites {
919            endpoint_builder
920                .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
921        }
922
923        if let Some(inspector) = self.transport_inspector {
924            endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
925        }
926
927        let endpoint = endpoint_builder.build();
928        let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
929
930        let stream_engine = self.stream_engine.unwrap_or_default();
931
932        let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
933            formatter
934        } else {
935            let formatter = if let Some(ref callrecord) = config.callrecord {
936                DefaultCallRecordFormatter::new_with_config(callrecord)
937            } else {
938                DefaultCallRecordFormatter::default()
939            };
940            Arc::new(formatter)
941        };
942
943        let callrecord_sender = if let Some(sender) = self.callrecord_sender {
944            Some(sender)
945        } else if let Some(ref callrecord) = config.callrecord {
946            let builder = CallRecordManagerBuilder::new()
947                .with_cancel_token(token.child_token())
948                .with_config(callrecord.clone())
949                .with_max_concurrent(32)
950                .with_formatter(callrecord_formatter.clone());
951
952            let mut callrecord_manager = builder.build();
953            let sender = callrecord_manager.sender.clone();
954            crate::spawn(async move {
955                callrecord_manager.serve().await;
956            });
957            Some(sender)
958        } else {
959            None
960        };
961
962        let app_state = Arc::new(AppStateInner {
963            config,
964            token,
965            stream_engine,
966            callrecord_sender,
967            endpoint,
968            registration_handles: Mutex::new(HashMap::new()),
969            alive_users: Arc::new(RwLock::new(HashSet::new())),
970            dialog_layer: dialog_layer.clone(),
971            create_invitation_handler: self.create_invitation_handler,
972            invitation: Invitation::new(dialog_layer),
973            routing_state: Arc::new(crate::call::RoutingState::new()),
974            pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
975            learned_public_addresses,
976            active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
977            total_calls: AtomicU64::new(0),
978            total_failed_calls: AtomicU64::new(0),
979            uptime: Local::now(),
980            shutting_down: Arc::new(AtomicBool::new(false)),
981        });
982
983        Ok(app_state)
984    }
985}