Skip to main content

active_call/
app.rs

1use crate::{
2    call::{ActiveCallRef, sip::Invitation},
3    callrecord::{
4        CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5    },
6    config::Config,
7    locator::RewriteTargetLocator,
8    useragent::{
9        RegisterOption,
10        invitation::{
11            FnCreateInvitationHandler, PendingDialog, PendingDialogGuard,
12            default_create_invite_handler,
13        },
14        public_address::{
15            LearningMessageInspector, SharedPublicAddress, build_contact, build_public_contact_uri,
16            find_local_addr_for_uri,
17        },
18        registration::{RegistrationHandle, UserCredential},
19    },
20};
21
22use crate::media::{cache::set_cache_dir, engine::StreamEngine};
23use anyhow::Result;
24use arc_swap::ArcSwap;
25use chrono::{DateTime, Local};
26use futures::FutureExt;
27use humantime::parse_duration;
28use rsip::prelude::HeadersExt;
29use rsipstack::transaction::{
30    Endpoint, TransactionReceiver,
31    endpoint::{TargetLocator, TransportEventInspector},
32};
33use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
34use std::future::pending;
35use std::str::FromStr;
36use std::sync::{Arc, RwLock};
37use std::time::Duration;
38use std::{collections::HashMap, net::SocketAddr};
39use std::{collections::HashSet, time::Instant};
40use std::{
41    path::Path,
42    sync::atomic::{AtomicBool, AtomicU64, Ordering},
43};
44use tokio::select;
45use tokio::sync::Mutex;
46use tokio_util::sync::CancellationToken;
47use tracing::{info, warn};
48
49pub struct AppStateInner {
50    pub config: Arc<Config>,
51    pub token: CancellationToken,
52    pub stream_engine: Arc<StreamEngine>,
53    pub callrecord_sender: Option<CallRecordSender>,
54    pub endpoint: Endpoint,
55    pub registration_handles: Mutex<HashMap<String, CancellationToken>>,
56    pub alive_users: Arc<RwLock<HashSet<String>>>,
57    pub dialog_layer: Arc<DialogLayer>,
58    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
59    pub invitation: Invitation,
60    pub routing_state: Arc<crate::call::RoutingState>,
61    pub pending_playbooks: Arc<Mutex<HashMap<String, (String, Instant)>>>,
62    pub learned_public_address: SharedPublicAddress,
63
64    pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
65    pub total_calls: AtomicU64,
66    pub total_failed_calls: AtomicU64,
67    pub uptime: DateTime<Local>,
68    pub shutting_down: Arc<AtomicBool>,
69}
70
71pub type AppState = Arc<AppStateInner>;
72
73pub struct AppStateBuilder {
74    pub config: Option<Config>,
75    pub stream_engine: Option<Arc<StreamEngine>>,
76    pub callrecord_sender: Option<CallRecordSender>,
77    pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
78    pub cancel_token: Option<CancellationToken>,
79    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
80    pub config_path: Option<String>,
81
82    pub message_inspector: Option<Box<dyn MessageInspector>>,
83    pub target_locator: Option<Box<dyn TargetLocator>>,
84    pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
85}
86
87impl AppStateInner {
88    pub fn auto_learn_public_address_enabled(&self) -> bool {
89        self.config.auto_learn_public_address.unwrap_or(false)
90    }
91
92    pub fn get_dump_events_file(&self, session_id: &String) -> String {
93        let recorder_root = self.config.recorder_path();
94        let root = Path::new(&recorder_root);
95        if !root.exists() {
96            match std::fs::create_dir_all(root) {
97                Ok(_) => {
98                    info!("created dump events root: {}", root.to_string_lossy());
99                }
100                Err(e) => {
101                    warn!(
102                        "Failed to create dump events root: {} {}",
103                        e,
104                        root.to_string_lossy()
105                    );
106                }
107            }
108        }
109        root.join(format!("{}.events.jsonl", session_id))
110            .to_string_lossy()
111            .to_string()
112    }
113
114    pub fn get_recorder_file(&self, session_id: &String) -> String {
115        let recorder_root = self.config.recorder_path();
116        let root = Path::new(&recorder_root);
117        if !root.exists() {
118            match std::fs::create_dir_all(root) {
119                Ok(_) => {
120                    info!("created recorder root: {}", root.to_string_lossy());
121                }
122                Err(e) => {
123                    warn!(
124                        "Failed to create recorder root: {} {}",
125                        e,
126                        root.to_string_lossy()
127                    );
128                }
129            }
130        }
131        let desired_ext = self.config.recorder_format().extension();
132        let mut filename = session_id.clone();
133        if !filename
134            .to_lowercase()
135            .ends_with(&format!(".{}", desired_ext.to_lowercase()))
136        {
137            filename = format!("{}.{}", filename, desired_ext);
138        }
139        root.join(filename).to_string_lossy().to_string()
140    }
141
142    pub async fn serve(self: Arc<Self>) -> Result<()> {
143        let incoming_txs = self.endpoint.incoming_transactions()?;
144        let token = self.token.child_token();
145        let endpoint_inner = self.endpoint.inner.clone();
146        let dialog_layer = self.dialog_layer.clone();
147        let app_state_clone = self.clone();
148
149        match self.start_registration().await {
150            Ok(count) => {
151                info!("registration started, count: {}", count);
152            }
153            Err(e) => {
154                warn!("failed to start registration: {:?}", e);
155            }
156        }
157
158        let pending_cleanup_state = self.clone();
159        let pending_cleanup_token = token.clone();
160        crate::spawn(async move {
161            let mut interval = tokio::time::interval(Duration::from_secs(60));
162            let ttl = Duration::from_secs(300);
163            loop {
164                tokio::select! {
165                    _ = pending_cleanup_token.cancelled() => break,
166                    _ = interval.tick() => {
167                        let mut pending = pending_cleanup_state.pending_playbooks.lock().await;
168                        let before = pending.len();
169                        pending.retain(|_, (_, created_at)| created_at.elapsed() < ttl);
170                        let removed = before - pending.len();
171                        if removed > 0 {
172                            info!(removed, remaining = pending.len(), "cleaned up stale pending_playbooks entries");
173                        }
174                    }
175                }
176            }
177        });
178
179        tokio::select! {
180            _ = token.cancelled() => {
181                info!("cancelled");
182            }
183            result = endpoint_inner.serve() => {
184                if let Err(e) = result {
185                    info!("endpoint serve error: {:?}", e);
186                }
187            }
188            result = app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
189                if let Err(e) = result {
190                    info!("process incoming request error: {:?}", e);
191                }
192            },
193        }
194
195        // Wait for registration to stop, if not stopped within 50 seconds,
196        // force stop it.
197        let timeout = self
198            .config
199            .graceful_shutdown
200            .map(|_| Duration::from_secs(10));
201
202        match self.stop_registration(timeout).await {
203            Ok(_) => {
204                info!("registration stopped, waiting for clear");
205            }
206            Err(e) => {
207                warn!("failed to stop registration: {:?}", e);
208            }
209        }
210        info!("stopping");
211        Ok(())
212    }
213
214    async fn process_incoming_request(
215        self: Arc<Self>,
216        dialog_layer: Arc<DialogLayer>,
217        mut incoming: TransactionReceiver,
218    ) -> Result<()> {
219        while let Some(mut tx) = incoming.recv().await {
220            let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
221            info!(?key, "received transaction");
222            if tx.original.to_header()?.tag()?.as_ref().is_some() {
223                match dialog_layer.match_dialog(&tx) {
224                    Some(mut d) => {
225                        crate::spawn(async move {
226                            match d.handle(&mut tx).await {
227                                Ok(_) => (),
228                                Err(e) => {
229                                    info!("error handling transaction: {:?}", e);
230                                }
231                            }
232                        });
233                        continue;
234                    }
235                    None => {
236                        info!("dialog not found: {}", tx.original);
237                        match tx
238                            .reply(rsip::StatusCode::CallTransactionDoesNotExist)
239                            .await
240                        {
241                            Ok(_) => (),
242                            Err(e) => {
243                                info!("error replying to request: {:?}", e);
244                            }
245                        }
246                        continue;
247                    }
248                }
249            }
250            // out dialog, new server dialog
251            let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
252            match tx.original.method {
253                rsip::Method::Invite | rsip::Method::Ack => {
254                    // Reject new INVITEs during graceful shutdown
255                    if self.shutting_down.load(Ordering::Relaxed) {
256                        info!(?key, "rejecting INVITE during graceful shutdown");
257                        match tx
258                            .reply_with(
259                                rsip::StatusCode::ServiceUnavailable,
260                                vec![rsip::Header::Other(
261                                    "Reason".into(),
262                                    "SIP;cause=503;text=\"Server shutting down\"".into(),
263                                )],
264                                None,
265                            )
266                            .await
267                        {
268                            Ok(_) => (),
269                            Err(e) => {
270                                info!("error replying to request: {:?}", e);
271                            }
272                        }
273                        continue;
274                    }
275
276                    let invitation_handler = match self.create_invitation_handler {
277                        Some(ref create_invitation_handler) => {
278                            create_invitation_handler(self.config.handler.as_ref()).ok()
279                        }
280                        _ => default_create_invite_handler(
281                            self.config.handler.as_ref(),
282                            Some(self.clone()),
283                        ),
284                    };
285                    let invitation_handler = match invitation_handler {
286                        Some(h) => h,
287                        None => {
288                            info!(?key, "no invite handler configured, rejecting INVITE");
289                            match tx
290                                .reply_with(
291                                    rsip::StatusCode::ServiceUnavailable,
292                                    vec![rsip::Header::Other(
293                                        "Reason".into(),
294                                        "SIP;cause=503;text=\"No invite handler configured\""
295                                            .into(),
296                                    )],
297                                    None,
298                                )
299                                .await
300                            {
301                                Ok(_) => (),
302                                Err(e) => {
303                                    info!("error replying to request: {:?}", e);
304                                }
305                            }
306                            continue;
307                        }
308                    };
309                    let local_addr = tx
310                        .connection
311                        .as_ref()
312                        .map(|connection| connection.get_addr().clone())
313                        .or_else(|| dialog_layer.endpoint.get_addrs().first().cloned());
314                    let contact_username =
315                        tx.original.uri.auth.as_ref().map(|auth| auth.user.as_str());
316                    let contact = local_addr.as_ref().map(|addr| {
317                        build_public_contact_uri(
318                            &self.learned_public_address,
319                            self.auto_learn_public_address_enabled(),
320                            addr,
321                            contact_username,
322                            None,
323                        )
324                    });
325
326                    let dialog = match dialog_layer.get_or_create_server_invite(
327                        &tx,
328                        state_sender,
329                        None,
330                        contact,
331                    ) {
332                        Ok(d) => d,
333                        Err(e) => {
334                            // 481 Dialog/Transaction Does Not Exist
335                            info!("failed to obtain dialog: {:?}", e);
336                            match tx
337                                .reply(rsip::StatusCode::CallTransactionDoesNotExist)
338                                .await
339                            {
340                                Ok(_) => (),
341                                Err(e) => {
342                                    info!("error replying to request: {:?}", e);
343                                }
344                            }
345                            continue;
346                        }
347                    };
348
349                    let dialog_id = dialog.id();
350                    let dialog_id_str = dialog_id.to_string();
351                    let token = self.token.child_token();
352                    let pending_dialog = PendingDialog {
353                        token: token.clone(),
354                        dialog: dialog.clone(),
355                        state_receiver,
356                    };
357
358                    let guard = Arc::new(PendingDialogGuard::new(
359                        self.invitation.clone(),
360                        dialog_id,
361                        pending_dialog,
362                    ));
363
364                    let accept_timeout = self
365                        .config
366                        .accept_timeout
367                        .as_ref()
368                        .and_then(|t| parse_duration(t).ok())
369                        .unwrap_or_else(|| Duration::from_secs(60));
370
371                    let token_ref = token.clone();
372                    let guard_ref = guard.clone();
373                    crate::spawn(async move {
374                        select! {
375                            _ = token_ref.cancelled() => {}
376                            _ = tokio::time::sleep(accept_timeout) => {}
377                        }
378                        guard_ref.drop_async().await;
379                    });
380
381                    let mut dialog_ref = dialog.clone();
382                    let token_ref = token.clone();
383                    let routing_state = self.routing_state.clone();
384                    let dialog_for_reject = dialog.clone();
385                    let guard_ref = guard.clone();
386                    crate::spawn(async move {
387                        let invite_loop = async {
388                            match invitation_handler
389                                .on_invite(
390                                    dialog_id_str.clone(),
391                                    token.clone(),
392                                    dialog.clone(),
393                                    routing_state,
394                                )
395                                .await
396                            {
397                                Ok(_) => (),
398                                Err(e) => {
399                                    // Webhook failed, reject the call immediately
400                                    info!(id = dialog_id_str, "error handling invite: {:?}", e);
401                                    let reason = format!("Failed to process invite: {}", e);
402                                    if let Err(reject_err) = dialog_for_reject.reject(
403                                        Some(rsip::StatusCode::ServiceUnavailable),
404                                        Some(reason),
405                                    ) {
406                                        info!(
407                                            id = dialog_id_str,
408                                            "error rejecting call: {:?}", reject_err
409                                        );
410                                    }
411                                    // Cancel token to stop dialog handling
412                                    token.cancel();
413                                    guard_ref.drop_async().await;
414                                }
415                            }
416                        };
417                        select! {
418                            _ = token_ref.cancelled() => {}
419                            _ = async {
420                                let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
421                             } => {}
422                        }
423                    });
424                }
425                rsip::Method::Options => {
426                    info!(?key, "ignoring out-of-dialog OPTIONS request");
427                    continue;
428                }
429                _ => {
430                    info!(?key, "received request: {:?}", tx.original.method);
431                    match tx.reply(rsip::StatusCode::OK).await {
432                        Ok(_) => (),
433                        Err(e) => {
434                            info!("error replying to request: {:?}", e);
435                        }
436                    }
437                }
438            }
439        }
440        Ok(())
441    }
442
443    pub fn stop(&self) {
444        if self.shutting_down.swap(true, Ordering::Relaxed) {
445            return;
446        }
447        info!("stopping, marking as shutting down");
448        self.token.cancel();
449    }
450
451    pub async fn graceful_stop(&self) -> Result<()> {
452        if self.shutting_down.swap(true, Ordering::Relaxed) {
453            return Ok(());
454        }
455
456        info!("graceful stopping, marking as shutting down");
457        let timeout = self
458            .config
459            .graceful_shutdown
460            .map(|_| Duration::from_secs(10));
461
462        self.stop_registration(timeout).await?;
463        self.token.cancel();
464        Ok(())
465    }
466
467    pub async fn start_registration(&self) -> Result<usize> {
468        let mut count = 0;
469        if let Some(register_users) = &self.config.register_users {
470            for option in register_users.iter() {
471                match self.register(option.clone()).await {
472                    Ok(_) => {
473                        count += 1;
474                    }
475                    Err(e) => {
476                        warn!("failed to register user: {:?} {:?}", e, option);
477                    }
478                }
479            }
480        }
481        Ok(count)
482    }
483
484    pub fn find_credentials_for_callee(&self, callee: &str) -> Option<UserCredential> {
485        let callee_uri = callee
486            .strip_prefix("sip:")
487            .or_else(|| callee.strip_prefix("sips:"))
488            .unwrap_or(callee);
489        let callee_uri = if !callee_uri.starts_with("sip:") && !callee_uri.starts_with("sips:") {
490            format!("sip:{}", callee_uri)
491        } else {
492            callee_uri.to_string()
493        };
494
495        let parsed_callee = match rsip::Uri::try_from(callee_uri.as_str()) {
496            Ok(uri) => uri,
497            Err(e) => {
498                warn!("failed to parse callee URI: {} {:?}", callee, e);
499                return None;
500            }
501        };
502
503        let callee_host = match &parsed_callee.host_with_port.host {
504            rsip::Host::Domain(domain) => domain.to_string(),
505            rsip::Host::IpAddr(ip) => return self.find_credentials_by_ip(ip),
506        };
507
508        // Look through registered users to find one matching this domain
509        if let Some(register_users) = &self.config.register_users {
510            for option in register_users.iter() {
511                let mut server = option.server.clone();
512                if !server.starts_with("sip:") && !server.starts_with("sips:") {
513                    server = format!("sip:{}", server);
514                }
515
516                let parsed_server = match rsip::Uri::try_from(server.as_str()) {
517                    Ok(uri) => uri,
518                    Err(e) => {
519                        warn!("failed to parse server URI: {} {:?}", option.server, e);
520                        continue;
521                    }
522                };
523
524                let server_host = match &parsed_server.host_with_port.host {
525                    rsip::Host::Domain(domain) => domain.to_string(),
526                    rsip::Host::IpAddr(ip) => {
527                        // Compare IP addresses
528                        if let rsip::Host::IpAddr(callee_ip) = &parsed_callee.host_with_port.host {
529                            if ip == callee_ip {
530                                if let Some(cred) = &option.credential {
531                                    info!(
532                                        callee,
533                                        username = cred.username,
534                                        server = option.server,
535                                        "Auto-injecting credentials from registered user for outbound call (IP match)"
536                                    );
537                                    return Some(cred.clone());
538                                }
539                            }
540                        }
541                        continue;
542                    }
543                };
544
545                if server_host == callee_host {
546                    if let Some(cred) = &option.credential {
547                        info!(
548                            callee,
549                            username = cred.username,
550                            server = option.server,
551                            "Auto-injecting credentials from registered user for outbound call"
552                        );
553                        return Some(cred.clone());
554                    }
555                }
556            }
557        }
558
559        None
560    }
561
562    /// Helper function to find credentials by IP address
563    fn find_credentials_by_ip(
564        &self,
565        callee_ip: &std::net::IpAddr,
566    ) -> Option<crate::useragent::registration::UserCredential> {
567        if let Some(register_users) = &self.config.register_users {
568            for option in register_users.iter() {
569                let mut server = option.server.clone();
570                if !server.starts_with("sip:") && !server.starts_with("sips:") {
571                    server = format!("sip:{}", server);
572                }
573
574                if let Ok(parsed_server) = rsip::Uri::try_from(server.as_str()) {
575                    if let rsip::Host::IpAddr(server_ip) = &parsed_server.host_with_port.host {
576                        if server_ip == callee_ip {
577                            if let Some(cred) = &option.credential {
578                                info!(
579                                    callee_ip = %callee_ip,
580                                    username = cred.username,
581                                    server = option.server,
582                                    "Auto-injecting credentials from registered user for outbound call (IP match)"
583                                );
584                                return Some(cred.clone());
585                            }
586                        }
587                    }
588                }
589            }
590        }
591        None
592    }
593
594    pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
595        {
596            let mut handles = self.registration_handles.lock().await;
597            for (_, cancel_token) in handles.drain() {
598                cancel_token.cancel();
599            }
600        }
601
602        if let Some(duration) = wait_for_clear {
603            let live_users = self.alive_users.clone();
604            let check_loop = async move {
605                loop {
606                    let is_empty = {
607                        let users = live_users
608                            .read()
609                            .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
610                        users.is_empty()
611                    };
612                    if is_empty {
613                        break;
614                    }
615                    tokio::time::sleep(Duration::from_millis(50)).await;
616                }
617                Ok::<(), anyhow::Error>(())
618            };
619            match tokio::time::timeout(duration, check_loop).await {
620                Ok(_) => {}
621                Err(e) => {
622                    warn!("failed to wait for clear: {}", e);
623                    return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
624                }
625            }
626        }
627        Ok(())
628    }
629
630    pub async fn register(&self, option: RegisterOption) -> Result<()> {
631        let user = option.aor();
632        let mut server = option.server.clone();
633        if !server.starts_with("sip:") && !server.starts_with("sips:") {
634            server = format!("sip:{}", server);
635        }
636        let sip_server = match rsip::Uri::try_from(server) {
637            Ok(uri) => uri,
638            Err(e) => {
639                warn!("failed to parse server: {} {:?}", e, option.server);
640                return Err(anyhow::anyhow!("failed to parse server: {}", e));
641            }
642        };
643        let cancel_token = self.token.child_token();
644        let credential = option.credential.clone().map(|c| c.into());
645        let registration = rsipstack::dialog::registration::Registration::new(
646            self.endpoint.inner.clone(),
647            credential,
648        );
649        let mut handle = RegistrationHandle {
650            registration,
651            option,
652            cancel_token: cancel_token.clone(),
653            start_time: Instant::now(),
654            last_update: Instant::now(),
655            last_response: None,
656        };
657        self.registration_handles
658            .lock()
659            .await
660            .insert(user.clone(), cancel_token);
661        tracing::debug!(user = user.as_str(), "starting registration task");
662        let alive_users = self.alive_users.clone();
663
664        crate::spawn(async move {
665            handle.start_time = Instant::now();
666            let cancel_token = handle.cancel_token.clone();
667            let addrs = handle.registration.endpoint.get_addrs();
668            let local_bind_addr = if let Some(addr) = find_local_addr_for_uri(&addrs, &sip_server) {
669                addr
670            } else {
671                warn!(
672                    user = user.as_str(),
673                    server = %sip_server,
674                    "failed to get local bind address for registration transport"
675                );
676                alive_users.write().unwrap().remove(&user);
677                return;
678            };
679            let user = handle.option.aor();
680            alive_users.write().unwrap().remove(&user);
681            let mut contact_address = local_bind_addr.addr.clone();
682            let mut contact = build_contact(
683                &local_bind_addr,
684                Some(contact_address.clone()),
685                Some(handle.option.username.as_str()),
686                None,
687            );
688            let mut should_register = true;
689            let mut timer = pending().boxed();
690
691            loop {
692                select! {
693                    _ = cancel_token.cancelled() => {
694                        break;
695                    }
696                    _ = timer.as_mut(), if !should_register => {
697                        should_register = true;
698                        timer = Box::pin(pending());
699                    }
700                    result = handle.do_register(&sip_server, None, &contact), if should_register => {
701                        match result {
702                            Ok((expires, new_addr)) => {
703                                if handle
704                                    .should_retry_registration_now(
705                                        &local_bind_addr,
706                                        &contact_address,
707                                        new_addr.as_ref(),
708                                    )
709                                {
710                                    if let Some(next_contact_address) = new_addr {
711                                        info!(
712                                            user = user.as_str(),
713                                            current_contact = %contact_address,
714                                            next_contact = %next_contact_address,
715                                            "public address changed, retrying registration immediately",
716                                        );
717                                        contact_address = next_contact_address;
718                                        contact = build_contact(
719                                            &local_bind_addr,
720                                            Some(contact_address.clone()),
721                                            Some(handle.option.username.as_str()),
722                                            None,
723                                        );
724                                        continue;
725                                    }
726                                }
727                                info!(
728                                    user = user.as_str(),
729                                    expires = expires,
730                                    contact = %contact_address,
731                                    alive_users = alive_users.read().unwrap().len(),
732                                    "registration refreshed",
733                                );
734                                alive_users.write().unwrap().insert(user.clone());
735                                should_register = false;
736                                timer = Box::pin(tokio::time::sleep(Duration::from_secs(
737                                    (expires * 3 / 4) as u64,
738                                )));
739                            }
740                            Err(e) => {
741                                warn!(
742                                    user = user.as_str(),
743                                    alive_users = alive_users.read().unwrap().len(),
744                                    "registration failed: {:?}", e
745                                );
746                                should_register = false;
747                                timer = Box::pin(tokio::time::sleep(Duration::from_secs(60)));
748                            }
749                        }
750                    }
751                }
752            }
753            handle
754                .do_register(&sip_server, Some(0), &contact)
755                .await
756                .ok();
757            alive_users.write().unwrap().remove(&user);
758        });
759        Ok(())
760    }
761}
762
763impl Drop for AppStateInner {
764    fn drop(&mut self) {
765        self.stop();
766    }
767}
768
769impl AppStateBuilder {
770    pub fn new() -> Self {
771        Self {
772            config: None,
773            stream_engine: None,
774            callrecord_sender: None,
775            callrecord_formatter: None,
776            cancel_token: None,
777            create_invitation_handler: None,
778            config_path: None,
779            message_inspector: None,
780            target_locator: None,
781            transport_inspector: None,
782        }
783    }
784
785    pub fn with_config(mut self, config: Config) -> Self {
786        self.config = Some(config);
787        self
788    }
789
790    pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
791        self.stream_engine = Some(stream_engine);
792        self
793    }
794
795    pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
796        self.callrecord_sender = Some(sender);
797        self
798    }
799
800    pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
801        self.cancel_token = Some(token);
802        self
803    }
804
805    pub fn with_config_metadata(mut self, path: Option<String>) -> Self {
806        self.config_path = path;
807        self
808    }
809
810    pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
811        self.message_inspector = Some(inspector);
812        self
813    }
814    pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
815        self.target_locator = Some(locator);
816        self
817    }
818
819    pub fn with_transport_inspector(
820        &mut self,
821        inspector: Box<dyn TransportEventInspector>,
822    ) -> &mut Self {
823        self.transport_inspector = Some(inspector);
824        self
825    }
826
827    pub async fn build(self) -> Result<AppState> {
828        let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
829        let token = self
830            .cancel_token
831            .unwrap_or_else(|| CancellationToken::new());
832        let _ = set_cache_dir(&config.media_cache_path);
833        let local_ip = if !config.addr.is_empty() {
834            std::net::IpAddr::from_str(config.addr.as_str())?
835        } else {
836            crate::net_tool::get_first_non_loopback_interface()?
837        };
838        let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
839        let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
840
841        // Create UDP socket with SO_REUSEPORT for graceful restarts
842        #[cfg(unix)]
843        let std_socket = {
844            use socket2::{Domain, Protocol, SockAddr, Socket, Type};
845
846            let domain = if local_addr.is_ipv4() {
847                Domain::IPV4
848            } else {
849                Domain::IPV6
850            };
851            let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
852                .map_err(|err| anyhow::anyhow!("Failed to create UDP socket: {}", err))?;
853
854            socket
855                .set_reuse_address(true)
856                .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEADDR: {}", err))?;
857
858            // SO_REUSEPORT (Linux/BSD)
859            #[cfg(not(any(target_os = "solaris", target_os = "illumos", target_os = "cygwin")))]
860            socket
861                .set_reuse_port(true)
862                .map_err(|err| anyhow::anyhow!("Failed to set SO_REUSEPORT: {}", err))?;
863
864            socket
865                .bind(&SockAddr::from(local_addr))
866                .map_err(|err| anyhow::anyhow!("Failed to bind UDP socket: {}", err))?;
867
868            let std_socket: std::net::UdpSocket = socket.into();
869            std_socket
870        };
871
872        #[cfg(not(unix))]
873        let std_socket = std::net::UdpSocket::bind(local_addr)?;
874
875        std_socket.set_nonblocking(true)?;
876        let tokio_socket = tokio::net::UdpSocket::from_std(std_socket)?;
877        // Use the actual bound address (important when port=0 lets OS assign a port)
878        let actual_addr = tokio_socket.local_addr()?;
879        let bind_addr = rsipstack::transport::SipConnection::resolve_bind_address(actual_addr);
880        let mut learned_public_address: SharedPublicAddress =
881            Arc::new(ArcSwap::from_pointee(bind_addr.into()));
882
883        let udp_inner = rsipstack::transport::udp::UdpInner {
884            conn: tokio_socket,
885            addr: rsipstack::transport::SipAddr {
886                r#type: Some(rsip::transport::Transport::Udp),
887                addr: bind_addr.into(),
888            },
889        };
890
891        let external = config
892            .external_ip
893            .as_ref()
894            .map(|ip| {
895                format!("{}:{}", ip, actual_addr.port())
896                    .parse()
897                    .map_err(|e| anyhow::anyhow!("Failed to parse external address: {}", e))
898            })
899            .transpose()?;
900
901        let udp_conn = rsipstack::transport::udp::UdpConnection::attach(
902            udp_inner,
903            external,
904            Some(token.child_token()),
905        )
906        .await;
907
908        info!(
909            "start useragent, addr: {} (SO_REUSEPORT enabled)",
910            udp_conn.get_addr()
911        );
912
913        transport_layer.add_transport(udp_conn.into());
914
915        // Optional SIP over TLS transport
916        if let Some(tls_port) = config.tls_port {
917            let tls_addr: std::net::SocketAddr = format!("{}:{}", local_ip, tls_port).parse()?;
918            let tls_sip_addr = rsipstack::transport::SipAddr {
919                r#type: Some(rsip::transport::Transport::Tls),
920                addr: tls_addr.into(),
921            };
922            let mut tls_cfg = rsipstack::transport::tls::TlsConfig::default();
923            if let Some(ref cert_path) = config.tls_cert_file {
924                tls_cfg.cert = Some(
925                    std::fs::read(cert_path)
926                        .map_err(|e| anyhow::anyhow!("tls_cert_file: {}", e))?,
927                );
928            }
929            if let Some(ref key_path) = config.tls_key_file {
930                tls_cfg.key = Some(
931                    std::fs::read(key_path).map_err(|e| anyhow::anyhow!("tls_key_file: {}", e))?,
932                );
933            }
934            let external_tls_addr = config
935                .external_ip
936                .as_ref()
937                .and_then(|ip| format!("{}:{}", ip, tls_port).parse().ok());
938            match rsipstack::transport::tls::TlsListenerConnection::new(
939                tls_sip_addr,
940                external_tls_addr,
941                tls_cfg,
942            )
943            .await
944            {
945                Ok(tls_conn) => {
946                    transport_layer.add_transport(tls_conn.into());
947                    info!("TLS SIP transport started on {}:{}", local_ip, tls_port);
948                }
949                Err(e) => {
950                    return Err(anyhow::anyhow!("Failed to start TLS SIP transport: {}", e));
951                }
952            }
953        }
954
955        let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
956        let mut endpoint_builder = rsipstack::EndpointBuilder::new();
957        if let Some(ref user_agent) = config.useragent {
958            endpoint_builder.with_user_agent(user_agent.as_str());
959        }
960
961        let mut endpoint_builder = endpoint_builder
962            .with_cancel_token(token.child_token())
963            .with_transport_layer(transport_layer)
964            .with_option(endpoint_option);
965
966        if config.auto_learn_public_address.unwrap_or_default() {
967            let inspector = LearningMessageInspector::new(bind_addr.into(), self.message_inspector);
968            learned_public_address = inspector.shared_public_address();
969            endpoint_builder = endpoint_builder.with_inspector(Box::new(inspector));
970        } else if let Some(inspector) = self.message_inspector {
971            endpoint_builder = endpoint_builder.with_inspector(inspector);
972        }
973
974        if let Some(locator) = self.target_locator {
975            endpoint_builder.with_target_locator(locator);
976        } else if let Some(ref rules) = config.rewrites {
977            endpoint_builder
978                .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
979        }
980
981        if let Some(inspector) = self.transport_inspector {
982            endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
983        }
984
985        let endpoint = endpoint_builder.build();
986        let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
987
988        let stream_engine = self.stream_engine.unwrap_or_default();
989
990        let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
991            formatter
992        } else {
993            let formatter = if let Some(ref callrecord) = config.callrecord {
994                DefaultCallRecordFormatter::new_with_config(callrecord)
995            } else {
996                DefaultCallRecordFormatter::default()
997            };
998            Arc::new(formatter)
999        };
1000
1001        let callrecord_sender = if let Some(sender) = self.callrecord_sender {
1002            Some(sender)
1003        } else if let Some(ref callrecord) = config.callrecord {
1004            let builder = CallRecordManagerBuilder::new()
1005                .with_cancel_token(token.child_token())
1006                .with_config(callrecord.clone())
1007                .with_max_concurrent(32)
1008                .with_formatter(callrecord_formatter.clone());
1009
1010            let mut callrecord_manager = builder.build();
1011            let sender = callrecord_manager.sender.clone();
1012            crate::spawn(async move {
1013                callrecord_manager.serve().await;
1014            });
1015            Some(sender)
1016        } else {
1017            None
1018        };
1019
1020        let app_state = Arc::new(AppStateInner {
1021            config,
1022            token,
1023            stream_engine,
1024            callrecord_sender,
1025            endpoint,
1026            registration_handles: Mutex::new(HashMap::new()),
1027            alive_users: Arc::new(RwLock::new(HashSet::new())),
1028            dialog_layer: dialog_layer.clone(),
1029            create_invitation_handler: self.create_invitation_handler,
1030            invitation: Invitation::new(dialog_layer),
1031            routing_state: Arc::new(crate::call::RoutingState::new()),
1032            pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
1033            learned_public_address,
1034            active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
1035            total_calls: AtomicU64::new(0),
1036            total_failed_calls: AtomicU64::new(0),
1037            uptime: Local::now(),
1038            shutting_down: Arc::new(AtomicBool::new(false)),
1039        });
1040
1041        Ok(app_state)
1042    }
1043}