Skip to main content

active_call/
app.rs

1use crate::media::{cache::set_cache_dir, engine::StreamEngine};
2use crate::{
3    call::{ActiveCallRef, sip::Invitation},
4    callrecord::{
5        CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
6    },
7    config::Config,
8    locator::RewriteTargetLocator,
9    useragent::{
10        RegisterOption,
11        invitation::{
12            FnCreateInvitationHandler, PendingDialog, PendingDialogGuard,
13            default_create_invite_handler,
14        },
15        registration::{RegistrationHandle, UserCredential},
16    },
17};
18use anyhow::Result;
19use chrono::{DateTime, Local};
20use futures::{FutureExt, future};
21use humantime::parse_duration;
22use rsip::prelude::HeadersExt;
23use rsipstack::transaction::{
24    Endpoint, TransactionReceiver,
25    endpoint::{TargetLocator, TransportEventInspector},
26};
27use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
28use std::collections::HashSet;
29use std::str::FromStr;
30use std::sync::{Arc, RwLock};
31use std::time::Duration;
32use std::{collections::HashMap, net::SocketAddr};
33use std::{path::Path, sync::atomic::AtomicU64};
34use tokio::select;
35use tokio::sync::Mutex;
36use tokio_util::sync::CancellationToken;
37use tracing::{info, warn};
38
39pub struct AppStateInner {
40    pub config: Arc<Config>,
41    pub token: CancellationToken,
42    pub endpoint_token: CancellationToken,
43    pub stream_engine: Arc<StreamEngine>,
44    pub callrecord_sender: Option<CallRecordSender>,
45    pub endpoint: Endpoint,
46    pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
47    pub alive_users: Arc<RwLock<HashSet<String>>>,
48    pub dialog_layer: Arc<DialogLayer>,
49    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
50    pub invitation: Invitation,
51    pub routing_state: Arc<crate::call::RoutingState>,
52    pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
53
54    pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
55    pub total_calls: AtomicU64,
56    pub total_failed_calls: AtomicU64,
57    pub uptime: DateTime<Local>,
58}
59
60pub type AppState = Arc<AppStateInner>;
61
62pub struct AppStateBuilder {
63    pub config: Option<Config>,
64    pub stream_engine: Option<Arc<StreamEngine>>,
65    pub callrecord_sender: Option<CallRecordSender>,
66    pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
67    pub cancel_token: Option<CancellationToken>,
68    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
69    pub config_path: Option<String>,
70
71    pub message_inspector: Option<Box<dyn MessageInspector>>,
72    pub target_locator: Option<Box<dyn TargetLocator>>,
73    pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
74}
75
76impl AppStateInner {
77    pub fn get_dump_events_file(&self, session_id: &String) -> String {
78        let recorder_root = self.config.recorder_path();
79        let root = Path::new(&recorder_root);
80        if !root.exists() {
81            match std::fs::create_dir_all(root) {
82                Ok(_) => {
83                    info!("created dump events root: {}", root.to_string_lossy());
84                }
85                Err(e) => {
86                    warn!(
87                        "Failed to create dump events root: {} {}",
88                        e,
89                        root.to_string_lossy()
90                    );
91                }
92            }
93        }
94        root.join(format!("{}.events.jsonl", session_id))
95            .to_string_lossy()
96            .to_string()
97    }
98
99    pub fn get_recorder_file(&self, session_id: &String) -> String {
100        let recorder_root = self.config.recorder_path();
101        let root = Path::new(&recorder_root);
102        if !root.exists() {
103            match std::fs::create_dir_all(root) {
104                Ok(_) => {
105                    info!("created recorder root: {}", root.to_string_lossy());
106                }
107                Err(e) => {
108                    warn!(
109                        "Failed to create recorder root: {} {}",
110                        e,
111                        root.to_string_lossy()
112                    );
113                }
114            }
115        }
116        let desired_ext = self.config.recorder_format().extension();
117        let mut filename = session_id.clone();
118        if !filename
119            .to_lowercase()
120            .ends_with(&format!(".{}", desired_ext.to_lowercase()))
121        {
122            filename = format!("{}.{}", filename, desired_ext);
123        }
124        root.join(filename).to_string_lossy().to_string()
125    }
126
127    pub async fn serve(self: Arc<Self>) -> Result<()> {
128        let incoming_txs = self.endpoint.incoming_transactions()?;
129        let token = self.token.child_token();
130        let endpoint_inner = self.endpoint.inner.clone();
131        let dialog_layer = self.dialog_layer.clone();
132        let app_state_clone = self.clone();
133
134        match self.start_registration().await {
135            Ok(count) => {
136                info!("registration started, count: {}", count);
137            }
138            Err(e) => {
139                warn!("failed to start registration: {:?}", e);
140            }
141        }
142
143        let serving = endpoint_inner.serve();
144        let processing =
145            app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs);
146        let mut cancelled = false;
147        let stop_registration = future::pending().boxed();
148        tokio::pin!(serving);
149        tokio::pin!(processing);
150        tokio::pin!(stop_registration);
151
152        loop {
153            tokio::select! {
154                _ = token.cancelled(), if !cancelled=> {
155                    cancelled = true;
156                    let timeout = self
157                        .config
158                        .graceful_shutdown
159                        .map(|_| Duration::from_secs(5));
160                    *stop_registration = self.stop_registration(timeout).boxed();
161                }
162                result = &mut serving => {
163                    if let Err(e) = result {
164                        info!("endpoint serve error: {:?}", e);
165                    }
166                    break;
167                }
168                result = &mut processing => {
169                    if let Err(e) = result {
170                        info!("process incoming request error: {:?}", e);
171                    }
172                    break;
173                },
174                result = &mut stop_registration => {
175                    match result {
176                        Ok(_) => {
177                            info!("registration stopped, waiting for clear");
178                        }
179                        Err(e) => {
180                            warn!("failed to stop registration: {:?}", e);
181                        }
182                    }
183                    break;
184                },
185            }
186        }
187
188        self.endpoint_token.cancel();
189        info!("stopping");
190        Ok(())
191    }
192
193    async fn process_incoming_request(
194        self: Arc<Self>,
195        dialog_layer: Arc<DialogLayer>,
196        mut incoming: TransactionReceiver,
197    ) -> Result<()> {
198        while let Some(mut tx) = incoming.recv().await {
199            let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
200            info!(?key, "received transaction");
201            if tx.original.to_header()?.tag()?.as_ref().is_some() {
202                match dialog_layer.match_dialog(&tx) {
203                    Some(mut d) => {
204                        crate::spawn(async move {
205                            match d.handle(&mut tx).await {
206                                Ok(_) => (),
207                                Err(e) => {
208                                    info!("error handling transaction: {:?}", e);
209                                }
210                            }
211                        });
212                        continue;
213                    }
214                    None => {
215                        info!("dialog not found: {}", tx.original);
216                        match tx
217                            .reply(rsip::StatusCode::CallTransactionDoesNotExist)
218                            .await
219                        {
220                            Ok(_) => (),
221                            Err(e) => {
222                                info!("error replying to request: {:?}", e);
223                            }
224                        }
225                        continue;
226                    }
227                }
228            }
229            // out dialog, new server dialog
230            let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
231            match tx.original.method {
232                rsip::Method::Invite | rsip::Method::Ack => {
233                    let invitation_handler = match self.create_invitation_handler {
234                        Some(ref create_invitation_handler) => {
235                            create_invitation_handler(self.config.handler.as_ref()).ok()
236                        }
237                        _ => default_create_invite_handler(
238                            self.config.handler.as_ref(),
239                            Some(self.clone()),
240                        ),
241                    };
242                    let invitation_handler = match invitation_handler {
243                        Some(h) => h,
244                        None => {
245                            info!(?key, "no invite handler configured, rejecting INVITE");
246                            match tx
247                                .reply_with(
248                                    rsip::StatusCode::ServiceUnavailable,
249                                    vec![rsip::Header::Other(
250                                        "Reason".into(),
251                                        "SIP;cause=503;text=\"No invite handler configured\""
252                                            .into(),
253                                    )],
254                                    None,
255                                )
256                                .await
257                            {
258                                Ok(_) => (),
259                                Err(e) => {
260                                    info!("error replying to request: {:?}", e);
261                                }
262                            }
263                            continue;
264                        }
265                    };
266                    let contact = dialog_layer
267                        .endpoint
268                        .get_addrs()
269                        .first()
270                        .map(|addr| rsip::Uri {
271                            scheme: Some(rsip::Scheme::Sip),
272                            auth: None,
273                            host_with_port: addr.addr.clone(),
274                            params: vec![],
275                            headers: vec![],
276                        });
277
278                    let dialog = match dialog_layer.get_or_create_server_invite(
279                        &tx,
280                        state_sender,
281                        None,
282                        contact,
283                    ) {
284                        Ok(d) => d,
285                        Err(e) => {
286                            // 481 Dialog/Transaction Does Not Exist
287                            info!("failed to obtain dialog: {:?}", e);
288                            match tx
289                                .reply(rsip::StatusCode::CallTransactionDoesNotExist)
290                                .await
291                            {
292                                Ok(_) => (),
293                                Err(e) => {
294                                    info!("error replying to request: {:?}", e);
295                                }
296                            }
297                            continue;
298                        }
299                    };
300
301                    let dialog_id = dialog.id();
302                    let dialog_id_str = dialog_id.to_string();
303                    let token = self.token.child_token();
304                    let pending_dialog = PendingDialog {
305                        token: token.clone(),
306                        dialog: dialog.clone(),
307                        state_receiver,
308                    };
309
310                    let guard = Arc::new(PendingDialogGuard::new(
311                        self.invitation.clone(),
312                        dialog_id,
313                        pending_dialog,
314                    ));
315
316                    let accept_timeout = self
317                        .config
318                        .accept_timeout
319                        .as_ref()
320                        .and_then(|t| parse_duration(t).ok())
321                        .unwrap_or_else(|| Duration::from_secs(60));
322
323                    let token_ref = token.clone();
324                    let guard_ref = guard.clone();
325                    crate::spawn(async move {
326                        select! {
327                            _ = token_ref.cancelled() => {}
328                            _ = tokio::time::sleep(accept_timeout) => {}
329                        }
330                        guard_ref.drop_async().await;
331                    });
332
333                    let mut dialog_ref = dialog.clone();
334                    let token_ref = token.clone();
335                    let routing_state = self.routing_state.clone();
336                    crate::spawn(async move {
337                        let invite_loop = async {
338                            match invitation_handler
339                                .on_invite(
340                                    dialog_id_str.clone(),
341                                    token,
342                                    dialog.clone(),
343                                    routing_state,
344                                )
345                                .await
346                            {
347                                Ok(_) => (),
348                                Err(e) => {
349                                    info!(id = dialog_id_str, "error handling invite: {:?}", e);
350                                    guard.drop_async().await;
351                                }
352                            }
353                        };
354                        select! {
355                            _ = token_ref.cancelled() => {}
356                            _ = async {
357                                let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
358                             } => {}
359                        }
360                    });
361                }
362                rsip::Method::Options => {
363                    info!(?key, "ignoring out-of-dialog OPTIONS request");
364                    continue;
365                }
366                _ => {
367                    info!(?key, "received request: {:?}", tx.original.method);
368                    match tx.reply(rsip::StatusCode::OK).await {
369                        Ok(_) => (),
370                        Err(e) => {
371                            info!("error replying to request: {:?}", e);
372                        }
373                    }
374                }
375            }
376        }
377        Ok(())
378    }
379
380    pub fn stop(&self) {
381        info!("stopping");
382        self.token.cancel();
383    }
384
385    pub async fn start_registration(&self) -> Result<usize> {
386        let mut count = 0;
387        if let Some(register_users) = &self.config.register_users {
388            for option in register_users.iter() {
389                match self.register(option.clone()).await {
390                    Ok(_) => {
391                        count += 1;
392                    }
393                    Err(e) => {
394                        warn!("failed to register user: {:?} {:?}", e, option);
395                    }
396                }
397            }
398        }
399        Ok(count)
400    }
401
402    pub fn find_credentials_for_callee(&self, callee: &str) -> Option<UserCredential> {
403        let callee_uri = callee
404            .strip_prefix("sip:")
405            .or_else(|| callee.strip_prefix("sips:"))
406            .unwrap_or(callee);
407        let callee_uri = if !callee_uri.starts_with("sip:") && !callee_uri.starts_with("sips:") {
408            format!("sip:{}", callee_uri)
409        } else {
410            callee_uri.to_string()
411        };
412
413        let parsed_callee = match rsip::Uri::try_from(callee_uri.as_str()) {
414            Ok(uri) => uri,
415            Err(e) => {
416                warn!("failed to parse callee URI: {} {:?}", callee, e);
417                return None;
418            }
419        };
420
421        let callee_host = match &parsed_callee.host_with_port.host {
422            rsip::Host::Domain(domain) => domain.to_string(),
423            rsip::Host::IpAddr(ip) => return self.find_credentials_by_ip(ip),
424        };
425
426        // Look through registered users to find one matching this domain
427        if let Some(register_users) = &self.config.register_users {
428            for option in register_users.iter() {
429                let mut server = option.server.clone();
430                if !server.starts_with("sip:") && !server.starts_with("sips:") {
431                    server = format!("sip:{}", server);
432                }
433
434                let parsed_server = match rsip::Uri::try_from(server.as_str()) {
435                    Ok(uri) => uri,
436                    Err(e) => {
437                        warn!("failed to parse server URI: {} {:?}", option.server, e);
438                        continue;
439                    }
440                };
441
442                let server_host = match &parsed_server.host_with_port.host {
443                    rsip::Host::Domain(domain) => domain.to_string(),
444                    rsip::Host::IpAddr(ip) => {
445                        // Compare IP addresses
446                        if let rsip::Host::IpAddr(callee_ip) = &parsed_callee.host_with_port.host {
447                            if ip == callee_ip {
448                                if let Some(cred) = &option.credential {
449                                    info!(
450                                        callee,
451                                        username = cred.username,
452                                        server = option.server,
453                                        "Auto-injecting credentials from registered user for outbound call (IP match)"
454                                    );
455                                    return Some(cred.clone());
456                                }
457                            }
458                        }
459                        continue;
460                    }
461                };
462
463                if server_host == callee_host {
464                    if let Some(cred) = &option.credential {
465                        info!(
466                            callee,
467                            username = cred.username,
468                            server = option.server,
469                            "Auto-injecting credentials from registered user for outbound call"
470                        );
471                        return Some(cred.clone());
472                    }
473                }
474            }
475        }
476
477        None
478    }
479
480    /// Helper function to find credentials by IP address
481    fn find_credentials_by_ip(
482        &self,
483        callee_ip: &std::net::IpAddr,
484    ) -> Option<crate::useragent::registration::UserCredential> {
485        if let Some(register_users) = &self.config.register_users {
486            for option in register_users.iter() {
487                let mut server = option.server.clone();
488                if !server.starts_with("sip:") && !server.starts_with("sips:") {
489                    server = format!("sip:{}", server);
490                }
491
492                if let Ok(parsed_server) = rsip::Uri::try_from(server.as_str()) {
493                    if let rsip::Host::IpAddr(server_ip) = &parsed_server.host_with_port.host {
494                        if server_ip == callee_ip {
495                            if let Some(cred) = &option.credential {
496                                info!(
497                                    callee_ip = %callee_ip,
498                                    username = cred.username,
499                                    server = option.server,
500                                    "Auto-injecting credentials from registered user for outbound call (IP match)"
501                                );
502                                return Some(cred.clone());
503                            }
504                        }
505                    }
506                }
507            }
508        }
509        None
510    }
511
512    pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
513        for (_, handle) in self.registration_handles.lock().await.iter_mut() {
514            handle.stop();
515        }
516
517        if let Some(duration) = wait_for_clear {
518            let live_users = self.alive_users.clone();
519            let check_loop = async move {
520                loop {
521                    let is_empty = {
522                        let users = live_users
523                            .read()
524                            .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
525                        users.is_empty()
526                    };
527                    if is_empty {
528                        break;
529                    }
530                    tokio::time::sleep(Duration::from_millis(50)).await;
531                }
532                Ok::<(), anyhow::Error>(())
533            };
534            match tokio::time::timeout(duration, check_loop).await {
535                Ok(_) => {}
536                Err(e) => {
537                    warn!("failed to wait for clear: {}", e);
538                    return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
539                }
540            }
541        }
542        Ok(())
543    }
544
545    pub async fn register(&self, option: RegisterOption) -> Result<()> {
546        let user = option.aor();
547        let mut server = option.server.clone();
548        if !server.starts_with("sip:") && !server.starts_with("sips:") {
549            server = format!("sip:{}", server);
550        }
551        let sip_server = match rsip::Uri::try_from(server) {
552            Ok(uri) => uri,
553            Err(e) => {
554                warn!("failed to parse server: {} {:?}", e, option.server);
555                return Err(anyhow::anyhow!("failed to parse server: {}", e));
556            }
557        };
558        let cancel_token = self.token.child_token();
559        let handle = RegistrationHandle {
560            inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
561                endpoint_inner: self.endpoint.inner.clone(),
562                option,
563                cancel_token,
564                start_time: Mutex::new(std::time::Instant::now()),
565                last_update: Mutex::new(std::time::Instant::now()),
566                last_response: Mutex::new(None),
567            }),
568        };
569        self.registration_handles
570            .lock()
571            .await
572            .insert(user.clone(), handle.clone());
573        tracing::debug!(user = user.as_str(), "starting registration task");
574        let alive_users = self.alive_users.clone();
575
576        crate::spawn(async move {
577            *handle.inner.start_time.lock().await = std::time::Instant::now();
578
579            select! {
580                _ = handle.inner.cancel_token.cancelled() => {
581                }
582                _ = async {
583                    loop {
584                        let user = handle.inner.option.aor();
585                        alive_users.write().unwrap().remove(&user);
586                        let refresh_time = match handle.do_register(&sip_server, None).await {
587                            Ok(expires) => {
588                                info!(
589                                    user = handle.inner.option.aor(),
590                                    expires = expires,
591                                    alive_users = alive_users.read().unwrap().len(),
592                                    "registration refreshed",
593                                );
594                                alive_users.write().unwrap().insert(user);
595                                expires * 3 / 4 // 75% of expiration time
596                            }
597                            Err(e) => {
598                                warn!(
599                                    user = handle.inner.option.aor(),
600                                    alive_users = alive_users.read().unwrap().len(),
601                                    "registration failed: {:?}", e);
602                                60
603                            }
604                        };
605                        tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
606                    }
607                } => {}
608            }
609            handle.do_register(&sip_server, Some(0)).await.ok();
610            alive_users.write().unwrap().remove(&user);
611        });
612        Ok(())
613    }
614}
615
616impl Drop for AppStateInner {
617    fn drop(&mut self) {
618        self.stop();
619    }
620}
621
622impl AppStateBuilder {
623    pub fn new() -> Self {
624        Self {
625            config: None,
626            stream_engine: None,
627            callrecord_sender: None,
628            callrecord_formatter: None,
629            cancel_token: None,
630            create_invitation_handler: None,
631            config_path: None,
632            message_inspector: None,
633            target_locator: None,
634            transport_inspector: None,
635        }
636    }
637
638    pub fn with_config(mut self, config: Config) -> Self {
639        self.config = Some(config);
640        self
641    }
642
643    pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
644        self.stream_engine = Some(stream_engine);
645        self
646    }
647
648    pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
649        self.callrecord_sender = Some(sender);
650        self
651    }
652
653    pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
654        self.cancel_token = Some(token);
655        self
656    }
657
658    pub fn with_config_metadata(mut self, path: Option<String>) -> Self {
659        self.config_path = path;
660        self
661    }
662
663    pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
664        self.message_inspector = Some(inspector);
665        self
666    }
667    pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
668        self.target_locator = Some(locator);
669        self
670    }
671
672    pub fn with_transport_inspector(
673        &mut self,
674        inspector: Box<dyn TransportEventInspector>,
675    ) -> &mut Self {
676        self.transport_inspector = Some(inspector);
677        self
678    }
679
680    pub async fn build(self) -> Result<AppState> {
681        let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
682        let token = self
683            .cancel_token
684            .unwrap_or_else(|| CancellationToken::new());
685        let _ = set_cache_dir(&config.media_cache_path);
686
687        let local_ip = if !config.addr.is_empty() {
688            std::net::IpAddr::from_str(config.addr.as_str())?
689        } else {
690            crate::net_tool::get_first_non_loopback_interface()?
691        };
692
693        let endpoint_cancel_token = CancellationToken::new();
694        let transport_layer =
695            rsipstack::transport::TransportLayer::new(endpoint_cancel_token.clone());
696        let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
697
698        let udp_conn = rsipstack::transport::udp::UdpConnection::create_connection(
699            local_addr,
700            None,
701            Some(endpoint_cancel_token.child_token()),
702        )
703        .await
704        .map_err(|e| anyhow::anyhow!("Create useragent UDP connection: {} {}", local_addr, e))?;
705
706        transport_layer.add_transport(udp_conn.into());
707        info!("start useragent, addr: {}", local_addr);
708
709        let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
710        let mut endpoint_builder = rsipstack::EndpointBuilder::new();
711        if let Some(ref user_agent) = config.useragent {
712            endpoint_builder.with_user_agent(user_agent.as_str());
713        }
714
715        let mut endpoint_builder = endpoint_builder
716            .with_cancel_token(endpoint_cancel_token.clone())
717            .with_transport_layer(transport_layer)
718            .with_option(endpoint_option);
719
720        if let Some(inspector) = self.message_inspector {
721            endpoint_builder = endpoint_builder.with_inspector(inspector);
722        }
723
724        if let Some(locator) = self.target_locator {
725            endpoint_builder.with_target_locator(locator);
726        } else if let Some(ref rules) = config.rewrites {
727            endpoint_builder
728                .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
729        }
730
731        if let Some(inspector) = self.transport_inspector {
732            endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
733        }
734
735        let endpoint = endpoint_builder.build();
736        let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
737
738        let stream_engine = self.stream_engine.unwrap_or_default();
739
740        let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
741            formatter
742        } else {
743            let formatter = if let Some(ref callrecord) = config.callrecord {
744                DefaultCallRecordFormatter::new_with_config(callrecord)
745            } else {
746                DefaultCallRecordFormatter::default()
747            };
748            Arc::new(formatter)
749        };
750
751        let callrecord_sender = if let Some(sender) = self.callrecord_sender {
752            Some(sender)
753        } else if let Some(ref callrecord) = config.callrecord {
754            let builder = CallRecordManagerBuilder::new()
755                .with_cancel_token(token.child_token())
756                .with_config(callrecord.clone())
757                .with_max_concurrent(32)
758                .with_formatter(callrecord_formatter.clone());
759
760            let mut callrecord_manager = builder.build();
761            let sender = callrecord_manager.sender.clone();
762            crate::spawn(async move {
763                callrecord_manager.serve().await;
764            });
765            Some(sender)
766        } else {
767            None
768        };
769
770        let app_state = Arc::new(AppStateInner {
771            config,
772            token,
773            stream_engine,
774            callrecord_sender,
775            endpoint,
776            registration_handles: Mutex::new(HashMap::new()),
777            alive_users: Arc::new(RwLock::new(HashSet::new())),
778            dialog_layer: dialog_layer.clone(),
779            create_invitation_handler: self.create_invitation_handler,
780            invitation: Invitation::new(dialog_layer),
781            routing_state: Arc::new(crate::call::RoutingState::new()),
782            pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
783            active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
784            total_calls: AtomicU64::new(0),
785            total_failed_calls: AtomicU64::new(0),
786            uptime: Local::now(),
787            endpoint_token: endpoint_cancel_token,
788        });
789
790        Ok(app_state)
791    }
792}