1use crate::media::{cache::set_cache_dir, engine::StreamEngine};
2use crate::{
3 call::{ActiveCallRef, sip::Invitation},
4 callrecord::{
5 CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
6 },
7 config::Config,
8 locator::RewriteTargetLocator,
9 useragent::{
10 RegisterOption,
11 invitation::{
12 FnCreateInvitationHandler, PendingDialog, PendingDialogGuard,
13 default_create_invite_handler,
14 },
15 registration::{RegistrationHandle, UserCredential},
16 },
17};
18use anyhow::Result;
19use chrono::{DateTime, Local};
20use futures::{FutureExt, future};
21use humantime::parse_duration;
22use rsip::prelude::HeadersExt;
23use rsipstack::transaction::{
24 Endpoint, TransactionReceiver,
25 endpoint::{TargetLocator, TransportEventInspector},
26};
27use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
28use std::collections::HashSet;
29use std::str::FromStr;
30use std::sync::{Arc, RwLock};
31use std::time::Duration;
32use std::{collections::HashMap, net::SocketAddr};
33use std::{path::Path, sync::atomic::AtomicU64};
34use tokio::select;
35use tokio::sync::Mutex;
36use tokio_util::sync::CancellationToken;
37use tracing::{info, warn};
38
39pub struct AppStateInner {
40 pub config: Arc<Config>,
41 pub token: CancellationToken,
42 pub endpoint_token: CancellationToken,
43 pub stream_engine: Arc<StreamEngine>,
44 pub callrecord_sender: Option<CallRecordSender>,
45 pub endpoint: Endpoint,
46 pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
47 pub alive_users: Arc<RwLock<HashSet<String>>>,
48 pub dialog_layer: Arc<DialogLayer>,
49 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
50 pub invitation: Invitation,
51 pub routing_state: Arc<crate::call::RoutingState>,
52 pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
53
54 pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
55 pub total_calls: AtomicU64,
56 pub total_failed_calls: AtomicU64,
57 pub uptime: DateTime<Local>,
58}
59
60pub type AppState = Arc<AppStateInner>;
61
62pub struct AppStateBuilder {
63 pub config: Option<Config>,
64 pub stream_engine: Option<Arc<StreamEngine>>,
65 pub callrecord_sender: Option<CallRecordSender>,
66 pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
67 pub cancel_token: Option<CancellationToken>,
68 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
69 pub config_path: Option<String>,
70
71 pub message_inspector: Option<Box<dyn MessageInspector>>,
72 pub target_locator: Option<Box<dyn TargetLocator>>,
73 pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
74}
75
76impl AppStateInner {
77 pub fn get_dump_events_file(&self, session_id: &String) -> String {
78 let recorder_root = self.config.recorder_path();
79 let root = Path::new(&recorder_root);
80 if !root.exists() {
81 match std::fs::create_dir_all(root) {
82 Ok(_) => {
83 info!("created dump events root: {}", root.to_string_lossy());
84 }
85 Err(e) => {
86 warn!(
87 "Failed to create dump events root: {} {}",
88 e,
89 root.to_string_lossy()
90 );
91 }
92 }
93 }
94 root.join(format!("{}.events.jsonl", session_id))
95 .to_string_lossy()
96 .to_string()
97 }
98
99 pub fn get_recorder_file(&self, session_id: &String) -> String {
100 let recorder_root = self.config.recorder_path();
101 let root = Path::new(&recorder_root);
102 if !root.exists() {
103 match std::fs::create_dir_all(root) {
104 Ok(_) => {
105 info!("created recorder root: {}", root.to_string_lossy());
106 }
107 Err(e) => {
108 warn!(
109 "Failed to create recorder root: {} {}",
110 e,
111 root.to_string_lossy()
112 );
113 }
114 }
115 }
116 let desired_ext = self.config.recorder_format().extension();
117 let mut filename = session_id.clone();
118 if !filename
119 .to_lowercase()
120 .ends_with(&format!(".{}", desired_ext.to_lowercase()))
121 {
122 filename = format!("{}.{}", filename, desired_ext);
123 }
124 root.join(filename).to_string_lossy().to_string()
125 }
126
127 pub async fn serve(self: Arc<Self>) -> Result<()> {
128 let incoming_txs = self.endpoint.incoming_transactions()?;
129 let token = self.token.child_token();
130 let endpoint_inner = self.endpoint.inner.clone();
131 let dialog_layer = self.dialog_layer.clone();
132 let app_state_clone = self.clone();
133
134 match self.start_registration().await {
135 Ok(count) => {
136 info!("registration started, count: {}", count);
137 }
138 Err(e) => {
139 warn!("failed to start registration: {:?}", e);
140 }
141 }
142
143 let serving = endpoint_inner.serve();
144 let processing =
145 app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs);
146 let mut cancelled = false;
147 let stop_registration = future::pending().boxed();
148 tokio::pin!(serving);
149 tokio::pin!(processing);
150 tokio::pin!(stop_registration);
151
152 loop {
153 tokio::select! {
154 _ = token.cancelled(), if !cancelled=> {
155 cancelled = true;
156 let timeout = self
157 .config
158 .graceful_shutdown
159 .map(|_| Duration::from_secs(5));
160 *stop_registration = self.stop_registration(timeout).boxed();
161 }
162 result = &mut serving => {
163 if let Err(e) = result {
164 info!("endpoint serve error: {:?}", e);
165 }
166 break;
167 }
168 result = &mut processing => {
169 if let Err(e) = result {
170 info!("process incoming request error: {:?}", e);
171 }
172 break;
173 },
174 result = &mut stop_registration => {
175 match result {
176 Ok(_) => {
177 info!("registration stopped, waiting for clear");
178 }
179 Err(e) => {
180 warn!("failed to stop registration: {:?}", e);
181 }
182 }
183 break;
184 },
185 }
186 }
187
188 self.endpoint_token.cancel();
189 info!("stopping");
190 Ok(())
191 }
192
193 async fn process_incoming_request(
194 self: Arc<Self>,
195 dialog_layer: Arc<DialogLayer>,
196 mut incoming: TransactionReceiver,
197 ) -> Result<()> {
198 while let Some(mut tx) = incoming.recv().await {
199 let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
200 info!(?key, "received transaction");
201 if tx.original.to_header()?.tag()?.as_ref().is_some() {
202 match dialog_layer.match_dialog(&tx) {
203 Some(mut d) => {
204 crate::spawn(async move {
205 match d.handle(&mut tx).await {
206 Ok(_) => (),
207 Err(e) => {
208 info!("error handling transaction: {:?}", e);
209 }
210 }
211 });
212 continue;
213 }
214 None => {
215 info!("dialog not found: {}", tx.original);
216 match tx
217 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
218 .await
219 {
220 Ok(_) => (),
221 Err(e) => {
222 info!("error replying to request: {:?}", e);
223 }
224 }
225 continue;
226 }
227 }
228 }
229 let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
231 match tx.original.method {
232 rsip::Method::Invite | rsip::Method::Ack => {
233 let invitation_handler = match self.create_invitation_handler {
234 Some(ref create_invitation_handler) => {
235 create_invitation_handler(self.config.handler.as_ref()).ok()
236 }
237 _ => default_create_invite_handler(
238 self.config.handler.as_ref(),
239 Some(self.clone()),
240 ),
241 };
242 let invitation_handler = match invitation_handler {
243 Some(h) => h,
244 None => {
245 info!(?key, "no invite handler configured, rejecting INVITE");
246 match tx
247 .reply_with(
248 rsip::StatusCode::ServiceUnavailable,
249 vec![rsip::Header::Other(
250 "Reason".into(),
251 "SIP;cause=503;text=\"No invite handler configured\""
252 .into(),
253 )],
254 None,
255 )
256 .await
257 {
258 Ok(_) => (),
259 Err(e) => {
260 info!("error replying to request: {:?}", e);
261 }
262 }
263 continue;
264 }
265 };
266 let contact = dialog_layer
267 .endpoint
268 .get_addrs()
269 .first()
270 .map(|addr| rsip::Uri {
271 scheme: Some(rsip::Scheme::Sip),
272 auth: None,
273 host_with_port: addr.addr.clone(),
274 params: vec![],
275 headers: vec![],
276 });
277
278 let dialog = match dialog_layer.get_or_create_server_invite(
279 &tx,
280 state_sender,
281 None,
282 contact,
283 ) {
284 Ok(d) => d,
285 Err(e) => {
286 info!("failed to obtain dialog: {:?}", e);
288 match tx
289 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
290 .await
291 {
292 Ok(_) => (),
293 Err(e) => {
294 info!("error replying to request: {:?}", e);
295 }
296 }
297 continue;
298 }
299 };
300
301 let dialog_id = dialog.id();
302 let dialog_id_str = dialog_id.to_string();
303 let token = self.token.child_token();
304 let pending_dialog = PendingDialog {
305 token: token.clone(),
306 dialog: dialog.clone(),
307 state_receiver,
308 };
309
310 let guard = Arc::new(PendingDialogGuard::new(
311 self.invitation.clone(),
312 dialog_id,
313 pending_dialog,
314 ));
315
316 let accept_timeout = self
317 .config
318 .accept_timeout
319 .as_ref()
320 .and_then(|t| parse_duration(t).ok())
321 .unwrap_or_else(|| Duration::from_secs(60));
322
323 let token_ref = token.clone();
324 let guard_ref = guard.clone();
325 crate::spawn(async move {
326 select! {
327 _ = token_ref.cancelled() => {}
328 _ = tokio::time::sleep(accept_timeout) => {}
329 }
330 guard_ref.drop_async().await;
331 });
332
333 let mut dialog_ref = dialog.clone();
334 let token_ref = token.clone();
335 let routing_state = self.routing_state.clone();
336 crate::spawn(async move {
337 let invite_loop = async {
338 match invitation_handler
339 .on_invite(
340 dialog_id_str.clone(),
341 token,
342 dialog.clone(),
343 routing_state,
344 )
345 .await
346 {
347 Ok(_) => (),
348 Err(e) => {
349 info!(id = dialog_id_str, "error handling invite: {:?}", e);
350 guard.drop_async().await;
351 }
352 }
353 };
354 select! {
355 _ = token_ref.cancelled() => {}
356 _ = async {
357 let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
358 } => {}
359 }
360 });
361 }
362 rsip::Method::Options => {
363 info!(?key, "ignoring out-of-dialog OPTIONS request");
364 continue;
365 }
366 _ => {
367 info!(?key, "received request: {:?}", tx.original.method);
368 match tx.reply(rsip::StatusCode::OK).await {
369 Ok(_) => (),
370 Err(e) => {
371 info!("error replying to request: {:?}", e);
372 }
373 }
374 }
375 }
376 }
377 Ok(())
378 }
379
380 pub fn stop(&self) {
381 info!("stopping");
382 self.token.cancel();
383 }
384
385 pub async fn start_registration(&self) -> Result<usize> {
386 let mut count = 0;
387 if let Some(register_users) = &self.config.register_users {
388 for option in register_users.iter() {
389 match self.register(option.clone()).await {
390 Ok(_) => {
391 count += 1;
392 }
393 Err(e) => {
394 warn!("failed to register user: {:?} {:?}", e, option);
395 }
396 }
397 }
398 }
399 Ok(count)
400 }
401
402 pub fn find_credentials_for_callee(&self, callee: &str) -> Option<UserCredential> {
403 let callee_uri = callee
404 .strip_prefix("sip:")
405 .or_else(|| callee.strip_prefix("sips:"))
406 .unwrap_or(callee);
407 let callee_uri = if !callee_uri.starts_with("sip:") && !callee_uri.starts_with("sips:") {
408 format!("sip:{}", callee_uri)
409 } else {
410 callee_uri.to_string()
411 };
412
413 let parsed_callee = match rsip::Uri::try_from(callee_uri.as_str()) {
414 Ok(uri) => uri,
415 Err(e) => {
416 warn!("failed to parse callee URI: {} {:?}", callee, e);
417 return None;
418 }
419 };
420
421 let callee_host = match &parsed_callee.host_with_port.host {
422 rsip::Host::Domain(domain) => domain.to_string(),
423 rsip::Host::IpAddr(ip) => return self.find_credentials_by_ip(ip),
424 };
425
426 if let Some(register_users) = &self.config.register_users {
428 for option in register_users.iter() {
429 let mut server = option.server.clone();
430 if !server.starts_with("sip:") && !server.starts_with("sips:") {
431 server = format!("sip:{}", server);
432 }
433
434 let parsed_server = match rsip::Uri::try_from(server.as_str()) {
435 Ok(uri) => uri,
436 Err(e) => {
437 warn!("failed to parse server URI: {} {:?}", option.server, e);
438 continue;
439 }
440 };
441
442 let server_host = match &parsed_server.host_with_port.host {
443 rsip::Host::Domain(domain) => domain.to_string(),
444 rsip::Host::IpAddr(ip) => {
445 if let rsip::Host::IpAddr(callee_ip) = &parsed_callee.host_with_port.host {
447 if ip == callee_ip {
448 if let Some(cred) = &option.credential {
449 info!(
450 callee,
451 username = cred.username,
452 server = option.server,
453 "Auto-injecting credentials from registered user for outbound call (IP match)"
454 );
455 return Some(cred.clone());
456 }
457 }
458 }
459 continue;
460 }
461 };
462
463 if server_host == callee_host {
464 if let Some(cred) = &option.credential {
465 info!(
466 callee,
467 username = cred.username,
468 server = option.server,
469 "Auto-injecting credentials from registered user for outbound call"
470 );
471 return Some(cred.clone());
472 }
473 }
474 }
475 }
476
477 None
478 }
479
480 fn find_credentials_by_ip(
482 &self,
483 callee_ip: &std::net::IpAddr,
484 ) -> Option<crate::useragent::registration::UserCredential> {
485 if let Some(register_users) = &self.config.register_users {
486 for option in register_users.iter() {
487 let mut server = option.server.clone();
488 if !server.starts_with("sip:") && !server.starts_with("sips:") {
489 server = format!("sip:{}", server);
490 }
491
492 if let Ok(parsed_server) = rsip::Uri::try_from(server.as_str()) {
493 if let rsip::Host::IpAddr(server_ip) = &parsed_server.host_with_port.host {
494 if server_ip == callee_ip {
495 if let Some(cred) = &option.credential {
496 info!(
497 callee_ip = %callee_ip,
498 username = cred.username,
499 server = option.server,
500 "Auto-injecting credentials from registered user for outbound call (IP match)"
501 );
502 return Some(cred.clone());
503 }
504 }
505 }
506 }
507 }
508 }
509 None
510 }
511
512 pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
513 for (_, handle) in self.registration_handles.lock().await.iter_mut() {
514 handle.stop();
515 }
516
517 if let Some(duration) = wait_for_clear {
518 let live_users = self.alive_users.clone();
519 let check_loop = async move {
520 loop {
521 let is_empty = {
522 let users = live_users
523 .read()
524 .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
525 users.is_empty()
526 };
527 if is_empty {
528 break;
529 }
530 tokio::time::sleep(Duration::from_millis(50)).await;
531 }
532 Ok::<(), anyhow::Error>(())
533 };
534 match tokio::time::timeout(duration, check_loop).await {
535 Ok(_) => {}
536 Err(e) => {
537 warn!("failed to wait for clear: {}", e);
538 return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
539 }
540 }
541 }
542 Ok(())
543 }
544
545 pub async fn register(&self, option: RegisterOption) -> Result<()> {
546 let user = option.aor();
547 let mut server = option.server.clone();
548 if !server.starts_with("sip:") && !server.starts_with("sips:") {
549 server = format!("sip:{}", server);
550 }
551 let sip_server = match rsip::Uri::try_from(server) {
552 Ok(uri) => uri,
553 Err(e) => {
554 warn!("failed to parse server: {} {:?}", e, option.server);
555 return Err(anyhow::anyhow!("failed to parse server: {}", e));
556 }
557 };
558 let cancel_token = self.token.child_token();
559 let handle = RegistrationHandle {
560 inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
561 endpoint_inner: self.endpoint.inner.clone(),
562 option,
563 cancel_token,
564 start_time: Mutex::new(std::time::Instant::now()),
565 last_update: Mutex::new(std::time::Instant::now()),
566 last_response: Mutex::new(None),
567 }),
568 };
569 self.registration_handles
570 .lock()
571 .await
572 .insert(user.clone(), handle.clone());
573 tracing::debug!(user = user.as_str(), "starting registration task");
574 let alive_users = self.alive_users.clone();
575
576 crate::spawn(async move {
577 *handle.inner.start_time.lock().await = std::time::Instant::now();
578
579 select! {
580 _ = handle.inner.cancel_token.cancelled() => {
581 }
582 _ = async {
583 loop {
584 let user = handle.inner.option.aor();
585 alive_users.write().unwrap().remove(&user);
586 let refresh_time = match handle.do_register(&sip_server, None).await {
587 Ok(expires) => {
588 info!(
589 user = handle.inner.option.aor(),
590 expires = expires,
591 alive_users = alive_users.read().unwrap().len(),
592 "registration refreshed",
593 );
594 alive_users.write().unwrap().insert(user);
595 expires * 3 / 4 }
597 Err(e) => {
598 warn!(
599 user = handle.inner.option.aor(),
600 alive_users = alive_users.read().unwrap().len(),
601 "registration failed: {:?}", e);
602 60
603 }
604 };
605 tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
606 }
607 } => {}
608 }
609 handle.do_register(&sip_server, Some(0)).await.ok();
610 alive_users.write().unwrap().remove(&user);
611 });
612 Ok(())
613 }
614}
615
616impl Drop for AppStateInner {
617 fn drop(&mut self) {
618 self.stop();
619 }
620}
621
622impl AppStateBuilder {
623 pub fn new() -> Self {
624 Self {
625 config: None,
626 stream_engine: None,
627 callrecord_sender: None,
628 callrecord_formatter: None,
629 cancel_token: None,
630 create_invitation_handler: None,
631 config_path: None,
632 message_inspector: None,
633 target_locator: None,
634 transport_inspector: None,
635 }
636 }
637
638 pub fn with_config(mut self, config: Config) -> Self {
639 self.config = Some(config);
640 self
641 }
642
643 pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
644 self.stream_engine = Some(stream_engine);
645 self
646 }
647
648 pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
649 self.callrecord_sender = Some(sender);
650 self
651 }
652
653 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
654 self.cancel_token = Some(token);
655 self
656 }
657
658 pub fn with_config_metadata(mut self, path: Option<String>) -> Self {
659 self.config_path = path;
660 self
661 }
662
663 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
664 self.message_inspector = Some(inspector);
665 self
666 }
667 pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
668 self.target_locator = Some(locator);
669 self
670 }
671
672 pub fn with_transport_inspector(
673 &mut self,
674 inspector: Box<dyn TransportEventInspector>,
675 ) -> &mut Self {
676 self.transport_inspector = Some(inspector);
677 self
678 }
679
680 pub async fn build(self) -> Result<AppState> {
681 let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
682 let token = self
683 .cancel_token
684 .unwrap_or_else(|| CancellationToken::new());
685 let _ = set_cache_dir(&config.media_cache_path);
686
687 let local_ip = if !config.addr.is_empty() {
688 std::net::IpAddr::from_str(config.addr.as_str())?
689 } else {
690 crate::net_tool::get_first_non_loopback_interface()?
691 };
692
693 let endpoint_cancel_token = CancellationToken::new();
694 let transport_layer =
695 rsipstack::transport::TransportLayer::new(endpoint_cancel_token.clone());
696 let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
697
698 let udp_conn = rsipstack::transport::udp::UdpConnection::create_connection(
699 local_addr,
700 None,
701 Some(endpoint_cancel_token.child_token()),
702 )
703 .await
704 .map_err(|e| anyhow::anyhow!("Create useragent UDP connection: {} {}", local_addr, e))?;
705
706 transport_layer.add_transport(udp_conn.into());
707 info!("start useragent, addr: {}", local_addr);
708
709 let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
710 let mut endpoint_builder = rsipstack::EndpointBuilder::new();
711 if let Some(ref user_agent) = config.useragent {
712 endpoint_builder.with_user_agent(user_agent.as_str());
713 }
714
715 let mut endpoint_builder = endpoint_builder
716 .with_cancel_token(endpoint_cancel_token.clone())
717 .with_transport_layer(transport_layer)
718 .with_option(endpoint_option);
719
720 if let Some(inspector) = self.message_inspector {
721 endpoint_builder = endpoint_builder.with_inspector(inspector);
722 }
723
724 if let Some(locator) = self.target_locator {
725 endpoint_builder.with_target_locator(locator);
726 } else if let Some(ref rules) = config.rewrites {
727 endpoint_builder
728 .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
729 }
730
731 if let Some(inspector) = self.transport_inspector {
732 endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
733 }
734
735 let endpoint = endpoint_builder.build();
736 let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
737
738 let stream_engine = self.stream_engine.unwrap_or_default();
739
740 let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
741 formatter
742 } else {
743 let formatter = if let Some(ref callrecord) = config.callrecord {
744 DefaultCallRecordFormatter::new_with_config(callrecord)
745 } else {
746 DefaultCallRecordFormatter::default()
747 };
748 Arc::new(formatter)
749 };
750
751 let callrecord_sender = if let Some(sender) = self.callrecord_sender {
752 Some(sender)
753 } else if let Some(ref callrecord) = config.callrecord {
754 let builder = CallRecordManagerBuilder::new()
755 .with_cancel_token(token.child_token())
756 .with_config(callrecord.clone())
757 .with_max_concurrent(32)
758 .with_formatter(callrecord_formatter.clone());
759
760 let mut callrecord_manager = builder.build();
761 let sender = callrecord_manager.sender.clone();
762 crate::spawn(async move {
763 callrecord_manager.serve().await;
764 });
765 Some(sender)
766 } else {
767 None
768 };
769
770 let app_state = Arc::new(AppStateInner {
771 config,
772 token,
773 stream_engine,
774 callrecord_sender,
775 endpoint,
776 registration_handles: Mutex::new(HashMap::new()),
777 alive_users: Arc::new(RwLock::new(HashSet::new())),
778 dialog_layer: dialog_layer.clone(),
779 create_invitation_handler: self.create_invitation_handler,
780 invitation: Invitation::new(dialog_layer),
781 routing_state: Arc::new(crate::call::RoutingState::new()),
782 pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
783 active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
784 total_calls: AtomicU64::new(0),
785 total_failed_calls: AtomicU64::new(0),
786 uptime: Local::now(),
787 endpoint_token: endpoint_cancel_token,
788 });
789
790 Ok(app_state)
791 }
792}