Skip to main content

active_call/
app.rs

1use crate::{
2    call::{ActiveCallRef, sip::Invitation},
3    callrecord::{
4        CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5    },
6    config::Config,
7    locator::RewriteTargetLocator,
8    useragent::{
9        RegisterOption,
10        invitation::{
11            FnCreateInvitationHandler, PendingDialog, PendingDialogGuard,
12            default_create_invite_handler,
13        },
14        registration::{RegistrationHandle, UserCredential},
15    },
16};
17
18use crate::media::{cache::set_cache_dir, engine::StreamEngine};
19use anyhow::Result;
20use chrono::{DateTime, Local};
21use humantime::parse_duration;
22use rsip::prelude::HeadersExt;
23use rsipstack::transaction::{
24    Endpoint, TransactionReceiver,
25    endpoint::{TargetLocator, TransportEventInspector},
26};
27use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
28use std::collections::HashSet;
29use std::str::FromStr;
30use std::sync::{Arc, RwLock};
31use std::time::Duration;
32use std::{collections::HashMap, net::SocketAddr};
33use std::{path::Path, sync::atomic::AtomicU64};
34use tokio::select;
35use tokio::sync::Mutex;
36use tokio_util::sync::CancellationToken;
37use tracing::{info, warn};
38
39pub struct AppStateInner {
40    pub config: Arc<Config>,
41    pub token: CancellationToken,
42    pub stream_engine: Arc<StreamEngine>,
43    pub callrecord_sender: Option<CallRecordSender>,
44    pub endpoint: Endpoint,
45    pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
46    pub alive_users: Arc<RwLock<HashSet<String>>>,
47    pub dialog_layer: Arc<DialogLayer>,
48    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
49    pub invitation: Invitation,
50    pub routing_state: Arc<crate::call::RoutingState>,
51    pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
52    pub pending_params: Arc<Mutex<HashMap<String, HashMap<String, serde_json::Value>>>>,
53
54    pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
55    pub total_calls: AtomicU64,
56    pub total_failed_calls: AtomicU64,
57    pub uptime: DateTime<Local>,
58}
59
60pub type AppState = Arc<AppStateInner>;
61
62pub struct AppStateBuilder {
63    pub config: Option<Config>,
64    pub stream_engine: Option<Arc<StreamEngine>>,
65    pub callrecord_sender: Option<CallRecordSender>,
66    pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
67    pub cancel_token: Option<CancellationToken>,
68    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
69    pub config_path: Option<String>,
70
71    pub message_inspector: Option<Box<dyn MessageInspector>>,
72    pub target_locator: Option<Box<dyn TargetLocator>>,
73    pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
74}
75
76impl AppStateInner {
77    pub fn get_dump_events_file(&self, session_id: &String) -> String {
78        let recorder_root = self.config.recorder_path();
79        let root = Path::new(&recorder_root);
80        if !root.exists() {
81            match std::fs::create_dir_all(root) {
82                Ok(_) => {
83                    info!("created dump events root: {}", root.to_string_lossy());
84                }
85                Err(e) => {
86                    warn!(
87                        "Failed to create dump events root: {} {}",
88                        e,
89                        root.to_string_lossy()
90                    );
91                }
92            }
93        }
94        root.join(format!("{}.events.jsonl", session_id))
95            .to_string_lossy()
96            .to_string()
97    }
98
99    pub fn get_recorder_file(&self, session_id: &String) -> String {
100        let recorder_root = self.config.recorder_path();
101        let root = Path::new(&recorder_root);
102        if !root.exists() {
103            match std::fs::create_dir_all(root) {
104                Ok(_) => {
105                    info!("created recorder root: {}", root.to_string_lossy());
106                }
107                Err(e) => {
108                    warn!(
109                        "Failed to create recorder root: {} {}",
110                        e,
111                        root.to_string_lossy()
112                    );
113                }
114            }
115        }
116        let desired_ext = self.config.recorder_format().extension();
117        let mut filename = session_id.clone();
118        if !filename
119            .to_lowercase()
120            .ends_with(&format!(".{}", desired_ext.to_lowercase()))
121        {
122            filename = format!("{}.{}", filename, desired_ext);
123        }
124        root.join(filename).to_string_lossy().to_string()
125    }
126
127    pub async fn serve(self: Arc<Self>) -> Result<()> {
128        let incoming_txs = self.endpoint.incoming_transactions()?;
129        let token = self.token.child_token();
130        let endpoint_inner = self.endpoint.inner.clone();
131        let dialog_layer = self.dialog_layer.clone();
132        let app_state_clone = self.clone();
133
134        match self.start_registration().await {
135            Ok(count) => {
136                info!("registration started, count: {}", count);
137            }
138            Err(e) => {
139                warn!("failed to start registration: {:?}", e);
140            }
141        }
142
143        tokio::select! {
144            _ = token.cancelled() => {
145                info!("cancelled");
146            }
147            result = endpoint_inner.serve() => {
148                if let Err(e) = result {
149                    info!("endpoint serve error: {:?}", e);
150                }
151            }
152            result = app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
153                if let Err(e) = result {
154                    info!("process incoming request error: {:?}", e);
155                }
156            },
157        }
158
159        // Wait for registration to stop, if not stopped within 50 seconds,
160        // force stop it.
161        let timeout = self
162            .config
163            .graceful_shutdown
164            .map(|_| Duration::from_secs(50));
165
166        match self.stop_registration(timeout).await {
167            Ok(_) => {
168                info!("registration stopped, waiting for clear");
169            }
170            Err(e) => {
171                warn!("failed to stop registration: {:?}", e);
172            }
173        }
174        info!("stopping");
175        Ok(())
176    }
177
178    async fn process_incoming_request(
179        self: Arc<Self>,
180        dialog_layer: Arc<DialogLayer>,
181        mut incoming: TransactionReceiver,
182    ) -> Result<()> {
183        while let Some(mut tx) = incoming.recv().await {
184            let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
185            info!(?key, "received transaction");
186            if tx.original.to_header()?.tag()?.as_ref().is_some() {
187                match dialog_layer.match_dialog(&tx) {
188                    Some(mut d) => {
189                        crate::spawn(async move {
190                            match d.handle(&mut tx).await {
191                                Ok(_) => (),
192                                Err(e) => {
193                                    info!("error handling transaction: {:?}", e);
194                                }
195                            }
196                        });
197                        continue;
198                    }
199                    None => {
200                        info!("dialog not found: {}", tx.original);
201                        match tx
202                            .reply(rsip::StatusCode::CallTransactionDoesNotExist)
203                            .await
204                        {
205                            Ok(_) => (),
206                            Err(e) => {
207                                info!("error replying to request: {:?}", e);
208                            }
209                        }
210                        continue;
211                    }
212                }
213            }
214            // out dialog, new server dialog
215            let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
216            match tx.original.method {
217                rsip::Method::Invite | rsip::Method::Ack => {
218                    let invitation_handler = match self.create_invitation_handler {
219                        Some(ref create_invitation_handler) => {
220                            create_invitation_handler(self.config.handler.as_ref()).ok()
221                        }
222                        _ => default_create_invite_handler(
223                            self.config.handler.as_ref(),
224                            Some(self.clone()),
225                        ),
226                    };
227                    let invitation_handler = match invitation_handler {
228                        Some(h) => h,
229                        None => {
230                            info!(?key, "no invite handler configured, rejecting INVITE");
231                            match tx
232                                .reply_with(
233                                    rsip::StatusCode::ServiceUnavailable,
234                                    vec![rsip::Header::Other(
235                                        "Reason".into(),
236                                        "SIP;cause=503;text=\"No invite handler configured\""
237                                            .into(),
238                                    )],
239                                    None,
240                                )
241                                .await
242                            {
243                                Ok(_) => (),
244                                Err(e) => {
245                                    info!("error replying to request: {:?}", e);
246                                }
247                            }
248                            continue;
249                        }
250                    };
251                    let contact = dialog_layer
252                        .endpoint
253                        .get_addrs()
254                        .first()
255                        .map(|addr| rsip::Uri {
256                            scheme: Some(rsip::Scheme::Sip),
257                            auth: None,
258                            host_with_port: addr.addr.clone(),
259                            params: vec![],
260                            headers: vec![],
261                        });
262
263                    let dialog = match dialog_layer.get_or_create_server_invite(
264                        &tx,
265                        state_sender,
266                        None,
267                        contact,
268                    ) {
269                        Ok(d) => d,
270                        Err(e) => {
271                            // 481 Dialog/Transaction Does Not Exist
272                            info!("failed to obtain dialog: {:?}", e);
273                            match tx
274                                .reply(rsip::StatusCode::CallTransactionDoesNotExist)
275                                .await
276                            {
277                                Ok(_) => (),
278                                Err(e) => {
279                                    info!("error replying to request: {:?}", e);
280                                }
281                            }
282                            continue;
283                        }
284                    };
285
286                    let dialog_id = dialog.id();
287                    let dialog_id_str = dialog_id.to_string();
288                    let token = self.token.child_token();
289                    let pending_dialog = PendingDialog {
290                        token: token.clone(),
291                        dialog: dialog.clone(),
292                        state_receiver,
293                    };
294
295                    let guard = Arc::new(PendingDialogGuard::new(
296                        self.invitation.clone(),
297                        dialog_id,
298                        pending_dialog,
299                    ));
300
301                    let accept_timeout = self
302                        .config
303                        .accept_timeout
304                        .as_ref()
305                        .and_then(|t| parse_duration(t).ok())
306                        .unwrap_or_else(|| Duration::from_secs(60));
307
308                    let token_ref = token.clone();
309                    let guard_ref = guard.clone();
310                    crate::spawn(async move {
311                        select! {
312                            _ = token_ref.cancelled() => {}
313                            _ = tokio::time::sleep(accept_timeout) => {}
314                        }
315                        guard_ref.drop_async().await;
316                    });
317
318                    let mut dialog_ref = dialog.clone();
319                    let token_ref = token.clone();
320                    let routing_state = self.routing_state.clone();
321                    let dialog_for_reject = dialog.clone();
322                    let guard_ref = guard.clone();
323                    crate::spawn(async move {
324                        let invite_loop = async {
325                            match invitation_handler
326                                .on_invite(
327                                    dialog_id_str.clone(),
328                                    token.clone(),
329                                    dialog.clone(),
330                                    routing_state,
331                                )
332                                .await
333                            {
334                                Ok(_) => (),
335                                Err(e) => {
336                                    // Webhook failed, reject the call immediately
337                                    info!(id = dialog_id_str, "error handling invite: {:?}", e);
338                                    let reason = format!("Failed to process invite: {}", e);
339                                    if let Err(reject_err) = dialog_for_reject.reject(
340                                        Some(rsip::StatusCode::ServiceUnavailable),
341                                        Some(reason),
342                                    ) {
343                                        info!(
344                                            id = dialog_id_str,
345                                            "error rejecting call: {:?}", reject_err
346                                        );
347                                    }
348                                    // Cancel token to stop dialog handling
349                                    token.cancel();
350                                    guard_ref.drop_async().await;
351                                }
352                            }
353                        };
354                        select! {
355                            _ = token_ref.cancelled() => {}
356                            _ = async {
357                                let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
358                             } => {}
359                        }
360                    });
361                }
362                rsip::Method::Options => {
363                    info!(?key, "ignoring out-of-dialog OPTIONS request");
364                    continue;
365                }
366                _ => {
367                    info!(?key, "received request: {:?}", tx.original.method);
368                    match tx.reply(rsip::StatusCode::OK).await {
369                        Ok(_) => (),
370                        Err(e) => {
371                            info!("error replying to request: {:?}", e);
372                        }
373                    }
374                }
375            }
376        }
377        Ok(())
378    }
379
380    pub fn stop(&self) {
381        info!("stopping");
382        self.token.cancel();
383    }
384
385    pub async fn start_registration(&self) -> Result<usize> {
386        let mut count = 0;
387        if let Some(register_users) = &self.config.register_users {
388            for option in register_users.iter() {
389                match self.register(option.clone()).await {
390                    Ok(_) => {
391                        count += 1;
392                    }
393                    Err(e) => {
394                        warn!("failed to register user: {:?} {:?}", e, option);
395                    }
396                }
397            }
398        }
399        Ok(count)
400    }
401
402    pub fn find_credentials_for_callee(&self, callee: &str) -> Option<UserCredential> {
403        let callee_uri = callee
404            .strip_prefix("sip:")
405            .or_else(|| callee.strip_prefix("sips:"))
406            .unwrap_or(callee);
407        let callee_uri = if !callee_uri.starts_with("sip:") && !callee_uri.starts_with("sips:") {
408            format!("sip:{}", callee_uri)
409        } else {
410            callee_uri.to_string()
411        };
412
413        let parsed_callee = match rsip::Uri::try_from(callee_uri.as_str()) {
414            Ok(uri) => uri,
415            Err(e) => {
416                warn!("failed to parse callee URI: {} {:?}", callee, e);
417                return None;
418            }
419        };
420
421        let callee_host = match &parsed_callee.host_with_port.host {
422            rsip::Host::Domain(domain) => domain.to_string(),
423            rsip::Host::IpAddr(ip) => return self.find_credentials_by_ip(ip),
424        };
425
426        // Look through registered users to find one matching this domain
427        if let Some(register_users) = &self.config.register_users {
428            for option in register_users.iter() {
429                let mut server = option.server.clone();
430                if !server.starts_with("sip:") && !server.starts_with("sips:") {
431                    server = format!("sip:{}", server);
432                }
433
434                let parsed_server = match rsip::Uri::try_from(server.as_str()) {
435                    Ok(uri) => uri,
436                    Err(e) => {
437                        warn!("failed to parse server URI: {} {:?}", option.server, e);
438                        continue;
439                    }
440                };
441
442                let server_host = match &parsed_server.host_with_port.host {
443                    rsip::Host::Domain(domain) => domain.to_string(),
444                    rsip::Host::IpAddr(ip) => {
445                        // Compare IP addresses
446                        if let rsip::Host::IpAddr(callee_ip) = &parsed_callee.host_with_port.host {
447                            if ip == callee_ip {
448                                if let Some(cred) = &option.credential {
449                                    info!(
450                                        callee,
451                                        username = cred.username,
452                                        server = option.server,
453                                        "Auto-injecting credentials from registered user for outbound call (IP match)"
454                                    );
455                                    return Some(cred.clone());
456                                }
457                            }
458                        }
459                        continue;
460                    }
461                };
462
463                if server_host == callee_host {
464                    if let Some(cred) = &option.credential {
465                        info!(
466                            callee,
467                            username = cred.username,
468                            server = option.server,
469                            "Auto-injecting credentials from registered user for outbound call"
470                        );
471                        return Some(cred.clone());
472                    }
473                }
474            }
475        }
476
477        None
478    }
479
480    /// Helper function to find credentials by IP address
481    fn find_credentials_by_ip(
482        &self,
483        callee_ip: &std::net::IpAddr,
484    ) -> Option<crate::useragent::registration::UserCredential> {
485        if let Some(register_users) = &self.config.register_users {
486            for option in register_users.iter() {
487                let mut server = option.server.clone();
488                if !server.starts_with("sip:") && !server.starts_with("sips:") {
489                    server = format!("sip:{}", server);
490                }
491
492                if let Ok(parsed_server) = rsip::Uri::try_from(server.as_str()) {
493                    if let rsip::Host::IpAddr(server_ip) = &parsed_server.host_with_port.host {
494                        if server_ip == callee_ip {
495                            if let Some(cred) = &option.credential {
496                                info!(
497                                    callee_ip = %callee_ip,
498                                    username = cred.username,
499                                    server = option.server,
500                                    "Auto-injecting credentials from registered user for outbound call (IP match)"
501                                );
502                                return Some(cred.clone());
503                            }
504                        }
505                    }
506                }
507            }
508        }
509        None
510    }
511
512    pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
513        for (_, handle) in self.registration_handles.lock().await.iter_mut() {
514            handle.stop();
515        }
516
517        if let Some(duration) = wait_for_clear {
518            let live_users = self.alive_users.clone();
519            let check_loop = async move {
520                loop {
521                    let is_empty = {
522                        let users = live_users
523                            .read()
524                            .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
525                        users.is_empty()
526                    };
527                    if is_empty {
528                        break;
529                    }
530                    tokio::time::sleep(Duration::from_millis(50)).await;
531                }
532                Ok::<(), anyhow::Error>(())
533            };
534            match tokio::time::timeout(duration, check_loop).await {
535                Ok(_) => {}
536                Err(e) => {
537                    warn!("failed to wait for clear: {}", e);
538                    return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
539                }
540            }
541        }
542        Ok(())
543    }
544
545    pub async fn register(&self, option: RegisterOption) -> Result<()> {
546        let user = option.aor();
547        let mut server = option.server.clone();
548        if !server.starts_with("sip:") && !server.starts_with("sips:") {
549            server = format!("sip:{}", server);
550        }
551        let sip_server = match rsip::Uri::try_from(server) {
552            Ok(uri) => uri,
553            Err(e) => {
554                warn!("failed to parse server: {} {:?}", e, option.server);
555                return Err(anyhow::anyhow!("failed to parse server: {}", e));
556            }
557        };
558        let cancel_token = self.token.child_token();
559        let handle = RegistrationHandle {
560            inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
561                endpoint_inner: self.endpoint.inner.clone(),
562                option,
563                cancel_token,
564                start_time: Mutex::new(std::time::Instant::now()),
565                last_update: Mutex::new(std::time::Instant::now()),
566                last_response: Mutex::new(None),
567            }),
568        };
569        self.registration_handles
570            .lock()
571            .await
572            .insert(user.clone(), handle.clone());
573        tracing::debug!(user = user.as_str(), "starting registration task");
574        let alive_users = self.alive_users.clone();
575
576        crate::spawn(async move {
577            *handle.inner.start_time.lock().await = std::time::Instant::now();
578
579            select! {
580                _ = handle.inner.cancel_token.cancelled() => {
581                }
582                _ = async {
583                    loop {
584                        let user = handle.inner.option.aor();
585                        alive_users.write().unwrap().remove(&user);
586                        let refresh_time = match handle.do_register(&sip_server, None).await {
587                            Ok(expires) => {
588                                info!(
589                                    user = handle.inner.option.aor(),
590                                    expires = expires,
591                                    alive_users = alive_users.read().unwrap().len(),
592                                    "registration refreshed",
593                                );
594                                alive_users.write().unwrap().insert(user);
595                                expires * 3 / 4 // 75% of expiration time
596                            }
597                            Err(e) => {
598                                warn!(
599                                    user = handle.inner.option.aor(),
600                                    alive_users = alive_users.read().unwrap().len(),
601                                    "registration failed: {:?}", e);
602                                60
603                            }
604                        };
605                        tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
606                    }
607                } => {}
608            }
609            handle.do_register(&sip_server, Some(0)).await.ok();
610            alive_users.write().unwrap().remove(&user);
611        });
612        Ok(())
613    }
614}
615
616impl Drop for AppStateInner {
617    fn drop(&mut self) {
618        self.stop();
619    }
620}
621
622impl AppStateBuilder {
623    pub fn new() -> Self {
624        Self {
625            config: None,
626            stream_engine: None,
627            callrecord_sender: None,
628            callrecord_formatter: None,
629            cancel_token: None,
630            create_invitation_handler: None,
631            config_path: None,
632            message_inspector: None,
633            target_locator: None,
634            transport_inspector: None,
635        }
636    }
637
638    pub fn with_config(mut self, config: Config) -> Self {
639        self.config = Some(config);
640        self
641    }
642
643    pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
644        self.stream_engine = Some(stream_engine);
645        self
646    }
647
648    pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
649        self.callrecord_sender = Some(sender);
650        self
651    }
652
653    pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
654        self.cancel_token = Some(token);
655        self
656    }
657
658    pub fn with_config_metadata(mut self, path: Option<String>) -> Self {
659        self.config_path = path;
660        self
661    }
662
663    pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
664        self.message_inspector = Some(inspector);
665        self
666    }
667    pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
668        self.target_locator = Some(locator);
669        self
670    }
671
672    pub fn with_transport_inspector(
673        &mut self,
674        inspector: Box<dyn TransportEventInspector>,
675    ) -> &mut Self {
676        self.transport_inspector = Some(inspector);
677        self
678    }
679
680    pub async fn build(self) -> Result<AppState> {
681        let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
682        let token = self
683            .cancel_token
684            .unwrap_or_else(|| CancellationToken::new());
685        let _ = set_cache_dir(&config.media_cache_path);
686
687        let local_ip = if !config.addr.is_empty() {
688            std::net::IpAddr::from_str(config.addr.as_str())?
689        } else {
690            crate::net_tool::get_first_non_loopback_interface()?
691        };
692        let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
693        let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
694
695        let udp_conn = rsipstack::transport::udp::UdpConnection::create_connection(
696            local_addr,
697            None,
698            Some(token.child_token()),
699        )
700        .await
701        .map_err(|e| anyhow::anyhow!("Create useragent UDP connection: {} {}", local_addr, e))?;
702
703        transport_layer.add_transport(udp_conn.into());
704        info!("start useragent, addr: {}", local_addr);
705
706        let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
707        let mut endpoint_builder = rsipstack::EndpointBuilder::new();
708        if let Some(ref user_agent) = config.useragent {
709            endpoint_builder.with_user_agent(user_agent.as_str());
710        }
711
712        let mut endpoint_builder = endpoint_builder
713            .with_cancel_token(token.child_token())
714            .with_transport_layer(transport_layer)
715            .with_option(endpoint_option);
716
717        if let Some(inspector) = self.message_inspector {
718            endpoint_builder = endpoint_builder.with_inspector(inspector);
719        }
720
721        if let Some(locator) = self.target_locator {
722            endpoint_builder.with_target_locator(locator);
723        } else if let Some(ref rules) = config.rewrites {
724            endpoint_builder
725                .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
726        }
727
728        if let Some(inspector) = self.transport_inspector {
729            endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
730        }
731
732        let endpoint = endpoint_builder.build();
733        let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
734
735        let stream_engine = self.stream_engine.unwrap_or_default();
736
737        let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
738            formatter
739        } else {
740            let formatter = if let Some(ref callrecord) = config.callrecord {
741                DefaultCallRecordFormatter::new_with_config(callrecord)
742            } else {
743                DefaultCallRecordFormatter::default()
744            };
745            Arc::new(formatter)
746        };
747
748        let callrecord_sender = if let Some(sender) = self.callrecord_sender {
749            Some(sender)
750        } else if let Some(ref callrecord) = config.callrecord {
751            let builder = CallRecordManagerBuilder::new()
752                .with_cancel_token(token.child_token())
753                .with_config(callrecord.clone())
754                .with_max_concurrent(32)
755                .with_formatter(callrecord_formatter.clone());
756
757            let mut callrecord_manager = builder.build();
758            let sender = callrecord_manager.sender.clone();
759            crate::spawn(async move {
760                callrecord_manager.serve().await;
761            });
762            Some(sender)
763        } else {
764            None
765        };
766
767        let app_state = Arc::new(AppStateInner {
768            config,
769            token,
770            stream_engine,
771            callrecord_sender,
772            endpoint,
773            registration_handles: Mutex::new(HashMap::new()),
774            alive_users: Arc::new(RwLock::new(HashSet::new())),
775            dialog_layer: dialog_layer.clone(),
776            create_invitation_handler: self.create_invitation_handler,
777            invitation: Invitation::new(dialog_layer),
778            routing_state: Arc::new(crate::call::RoutingState::new()),
779            pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
780            pending_params: Arc::new(Mutex::new(HashMap::new())),
781            active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
782            total_calls: AtomicU64::new(0),
783            total_failed_calls: AtomicU64::new(0),
784            uptime: Local::now(),
785        });
786
787        Ok(app_state)
788    }
789}