1use crate::{
2 call::{ActiveCallRef, sip::Invitation},
3 callrecord::{
4 CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5 },
6 config::Config,
7 locator::RewriteTargetLocator,
8 useragent::{
9 RegisterOption,
10 invitation::{
11 FnCreateInvitationHandler, PendingDialog, PendingDialogGuard,
12 default_create_invite_handler,
13 },
14 registration::{RegistrationHandle, UserCredential},
15 },
16};
17
18use crate::media::{cache::set_cache_dir, engine::StreamEngine};
19use anyhow::Result;
20use chrono::{DateTime, Local};
21use humantime::parse_duration;
22use rsip::prelude::HeadersExt;
23use rsipstack::transaction::{
24 Endpoint, TransactionReceiver,
25 endpoint::{TargetLocator, TransportEventInspector},
26};
27use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
28use std::collections::HashSet;
29use std::str::FromStr;
30use std::sync::{Arc, RwLock};
31use std::time::Duration;
32use std::{collections::HashMap, net::SocketAddr};
33use std::{path::Path, sync::atomic::AtomicU64};
34use tokio::select;
35use tokio::sync::Mutex;
36use tokio_util::sync::CancellationToken;
37use tracing::{info, warn};
38
39pub struct AppStateInner {
40 pub config: Arc<Config>,
41 pub token: CancellationToken,
42 pub stream_engine: Arc<StreamEngine>,
43 pub callrecord_sender: Option<CallRecordSender>,
44 pub endpoint: Endpoint,
45 pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
46 pub alive_users: Arc<RwLock<HashSet<String>>>,
47 pub dialog_layer: Arc<DialogLayer>,
48 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
49 pub invitation: Invitation,
50 pub routing_state: Arc<crate::call::RoutingState>,
51 pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
52
53 pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
54 pub total_calls: AtomicU64,
55 pub total_failed_calls: AtomicU64,
56 pub uptime: DateTime<Local>,
57}
58
59pub type AppState = Arc<AppStateInner>;
60
61pub struct AppStateBuilder {
62 pub config: Option<Config>,
63 pub stream_engine: Option<Arc<StreamEngine>>,
64 pub callrecord_sender: Option<CallRecordSender>,
65 pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
66 pub cancel_token: Option<CancellationToken>,
67 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
68 pub config_path: Option<String>,
69
70 pub message_inspector: Option<Box<dyn MessageInspector>>,
71 pub target_locator: Option<Box<dyn TargetLocator>>,
72 pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
73}
74
75impl AppStateInner {
76 pub fn get_dump_events_file(&self, session_id: &String) -> String {
77 let recorder_root = self.config.recorder_path();
78 let root = Path::new(&recorder_root);
79 if !root.exists() {
80 match std::fs::create_dir_all(root) {
81 Ok(_) => {
82 info!("created dump events root: {}", root.to_string_lossy());
83 }
84 Err(e) => {
85 warn!(
86 "Failed to create dump events root: {} {}",
87 e,
88 root.to_string_lossy()
89 );
90 }
91 }
92 }
93 root.join(format!("{}.events.jsonl", session_id))
94 .to_string_lossy()
95 .to_string()
96 }
97
98 pub fn get_recorder_file(&self, session_id: &String) -> String {
99 let recorder_root = self.config.recorder_path();
100 let root = Path::new(&recorder_root);
101 if !root.exists() {
102 match std::fs::create_dir_all(root) {
103 Ok(_) => {
104 info!("created recorder root: {}", root.to_string_lossy());
105 }
106 Err(e) => {
107 warn!(
108 "Failed to create recorder root: {} {}",
109 e,
110 root.to_string_lossy()
111 );
112 }
113 }
114 }
115 let desired_ext = self.config.recorder_format().extension();
116 let mut filename = session_id.clone();
117 if !filename
118 .to_lowercase()
119 .ends_with(&format!(".{}", desired_ext.to_lowercase()))
120 {
121 filename = format!("{}.{}", filename, desired_ext);
122 }
123 root.join(filename).to_string_lossy().to_string()
124 }
125
126 pub async fn serve(self: Arc<Self>) -> Result<()> {
127 let incoming_txs = self.endpoint.incoming_transactions()?;
128 let token = self.token.child_token();
129 let endpoint_inner = self.endpoint.inner.clone();
130 let dialog_layer = self.dialog_layer.clone();
131 let app_state_clone = self.clone();
132
133 match self.start_registration().await {
134 Ok(count) => {
135 info!("registration started, count: {}", count);
136 }
137 Err(e) => {
138 warn!("failed to start registration: {:?}", e);
139 }
140 }
141
142 tokio::select! {
143 _ = token.cancelled() => {
144 info!("cancelled");
145 }
146 result = endpoint_inner.serve() => {
147 if let Err(e) = result {
148 info!("endpoint serve error: {:?}", e);
149 }
150 }
151 result = app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
152 if let Err(e) = result {
153 info!("process incoming request error: {:?}", e);
154 }
155 },
156 }
157
158 let timeout = self
161 .config
162 .graceful_shutdown
163 .map(|_| Duration::from_secs(50));
164
165 match self.stop_registration(timeout).await {
166 Ok(_) => {
167 info!("registration stopped, waiting for clear");
168 }
169 Err(e) => {
170 warn!("failed to stop registration: {:?}", e);
171 }
172 }
173 info!("stopping");
174 Ok(())
175 }
176
177 async fn process_incoming_request(
178 self: Arc<Self>,
179 dialog_layer: Arc<DialogLayer>,
180 mut incoming: TransactionReceiver,
181 ) -> Result<()> {
182 while let Some(mut tx) = incoming.recv().await {
183 let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
184 info!(?key, "received transaction");
185 if tx.original.to_header()?.tag()?.as_ref().is_some() {
186 match dialog_layer.match_dialog(&tx.original) {
187 Some(mut d) => {
188 crate::spawn(async move {
189 match d.handle(&mut tx).await {
190 Ok(_) => (),
191 Err(e) => {
192 info!("error handling transaction: {:?}", e);
193 }
194 }
195 });
196 continue;
197 }
198 None => {
199 info!("dialog not found: {}", tx.original);
200 match tx
201 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
202 .await
203 {
204 Ok(_) => (),
205 Err(e) => {
206 info!("error replying to request: {:?}", e);
207 }
208 }
209 continue;
210 }
211 }
212 }
213 let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
215 match tx.original.method {
216 rsip::Method::Invite | rsip::Method::Ack => {
217 let invitation_handler = match self.create_invitation_handler {
218 Some(ref create_invitation_handler) => {
219 create_invitation_handler(self.config.handler.as_ref()).ok()
220 }
221 _ => default_create_invite_handler(
222 self.config.handler.as_ref(),
223 Some(self.clone()),
224 ),
225 };
226 let invitation_handler = match invitation_handler {
227 Some(h) => h,
228 None => {
229 info!(?key, "no invite handler configured, rejecting INVITE");
230 match tx
231 .reply_with(
232 rsip::StatusCode::ServiceUnavailable,
233 vec![rsip::Header::Other(
234 "Reason".into(),
235 "SIP;cause=503;text=\"No invite handler configured\""
236 .into(),
237 )],
238 None,
239 )
240 .await
241 {
242 Ok(_) => (),
243 Err(e) => {
244 info!("error replying to request: {:?}", e);
245 }
246 }
247 continue;
248 }
249 };
250 let contact = dialog_layer
251 .endpoint
252 .get_addrs()
253 .first()
254 .map(|addr| rsip::Uri {
255 scheme: Some(rsip::Scheme::Sip),
256 auth: None,
257 host_with_port: addr.addr.clone(),
258 params: vec![],
259 headers: vec![],
260 });
261
262 let dialog = match dialog_layer.get_or_create_server_invite(
263 &tx,
264 state_sender,
265 None,
266 contact,
267 ) {
268 Ok(d) => d,
269 Err(e) => {
270 info!("failed to obtain dialog: {:?}", e);
272 match tx
273 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
274 .await
275 {
276 Ok(_) => (),
277 Err(e) => {
278 info!("error replying to request: {:?}", e);
279 }
280 }
281 continue;
282 }
283 };
284
285 let dialog_id = dialog.id();
286 let dialog_id_str = dialog_id.to_string();
287 let token = self.token.child_token();
288 let pending_dialog = PendingDialog {
289 token: token.clone(),
290 dialog: dialog.clone(),
291 state_receiver,
292 };
293
294 let guard = Arc::new(PendingDialogGuard::new(
295 self.invitation.clone(),
296 dialog_id,
297 pending_dialog,
298 ));
299
300 let accept_timeout = self
301 .config
302 .accept_timeout
303 .as_ref()
304 .and_then(|t| parse_duration(t).ok())
305 .unwrap_or_else(|| Duration::from_secs(60));
306
307 let token_ref = token.clone();
308 let guard_ref = guard.clone();
309 crate::spawn(async move {
310 select! {
311 _ = token_ref.cancelled() => {}
312 _ = tokio::time::sleep(accept_timeout) => {}
313 }
314 guard_ref.drop_async().await;
315 });
316
317 let mut dialog_ref = dialog.clone();
318 let token_ref = token.clone();
319 let routing_state = self.routing_state.clone();
320 crate::spawn(async move {
321 let invite_loop = async {
322 match invitation_handler
323 .on_invite(
324 dialog_id_str.clone(),
325 token,
326 dialog.clone(),
327 routing_state,
328 )
329 .await
330 {
331 Ok(_) => (),
332 Err(e) => {
333 info!(id = dialog_id_str, "error handling invite: {:?}", e);
334 guard.drop_async().await;
335 }
336 }
337 };
338 select! {
339 _ = token_ref.cancelled() => {}
340 _ = async {
341 let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
342 } => {}
343 }
344 });
345 }
346 rsip::Method::Options => {
347 info!(?key, "ignoring out-of-dialog OPTIONS request");
348 continue;
349 }
350 _ => {
351 info!(?key, "received request: {:?}", tx.original.method);
352 match tx.reply(rsip::StatusCode::OK).await {
353 Ok(_) => (),
354 Err(e) => {
355 info!("error replying to request: {:?}", e);
356 }
357 }
358 }
359 }
360 }
361 Ok(())
362 }
363
364 pub fn stop(&self) {
365 info!("stopping");
366 self.token.cancel();
367 }
368
369 pub async fn start_registration(&self) -> Result<usize> {
370 let mut count = 0;
371 if let Some(register_users) = &self.config.register_users {
372 for option in register_users.iter() {
373 match self.register(option.clone()).await {
374 Ok(_) => {
375 count += 1;
376 }
377 Err(e) => {
378 warn!("failed to register user: {:?} {:?}", e, option);
379 }
380 }
381 }
382 }
383 Ok(count)
384 }
385
386 pub fn find_credentials_for_callee(&self, callee: &str) -> Option<UserCredential> {
387 let callee_uri = callee
388 .strip_prefix("sip:")
389 .or_else(|| callee.strip_prefix("sips:"))
390 .unwrap_or(callee);
391 let callee_uri = if !callee_uri.starts_with("sip:") && !callee_uri.starts_with("sips:") {
392 format!("sip:{}", callee_uri)
393 } else {
394 callee_uri.to_string()
395 };
396
397 let parsed_callee = match rsip::Uri::try_from(callee_uri.as_str()) {
398 Ok(uri) => uri,
399 Err(e) => {
400 warn!("failed to parse callee URI: {} {:?}", callee, e);
401 return None;
402 }
403 };
404
405 let callee_host = match &parsed_callee.host_with_port.host {
406 rsip::Host::Domain(domain) => domain.to_string(),
407 rsip::Host::IpAddr(ip) => return self.find_credentials_by_ip(ip),
408 };
409
410 if let Some(register_users) = &self.config.register_users {
412 for option in register_users.iter() {
413 let mut server = option.server.clone();
414 if !server.starts_with("sip:") && !server.starts_with("sips:") {
415 server = format!("sip:{}", server);
416 }
417
418 let parsed_server = match rsip::Uri::try_from(server.as_str()) {
419 Ok(uri) => uri,
420 Err(e) => {
421 warn!("failed to parse server URI: {} {:?}", option.server, e);
422 continue;
423 }
424 };
425
426 let server_host = match &parsed_server.host_with_port.host {
427 rsip::Host::Domain(domain) => domain.to_string(),
428 rsip::Host::IpAddr(ip) => {
429 if let rsip::Host::IpAddr(callee_ip) = &parsed_callee.host_with_port.host {
431 if ip == callee_ip {
432 if let Some(cred) = &option.credential {
433 info!(
434 callee,
435 username = cred.username,
436 server = option.server,
437 "Auto-injecting credentials from registered user for outbound call (IP match)"
438 );
439 return Some(cred.clone());
440 }
441 }
442 }
443 continue;
444 }
445 };
446
447 if server_host == callee_host {
448 if let Some(cred) = &option.credential {
449 info!(
450 callee,
451 username = cred.username,
452 server = option.server,
453 "Auto-injecting credentials from registered user for outbound call"
454 );
455 return Some(cred.clone());
456 }
457 }
458 }
459 }
460
461 None
462 }
463
464 fn find_credentials_by_ip(
466 &self,
467 callee_ip: &std::net::IpAddr,
468 ) -> Option<crate::useragent::registration::UserCredential> {
469 if let Some(register_users) = &self.config.register_users {
470 for option in register_users.iter() {
471 let mut server = option.server.clone();
472 if !server.starts_with("sip:") && !server.starts_with("sips:") {
473 server = format!("sip:{}", server);
474 }
475
476 if let Ok(parsed_server) = rsip::Uri::try_from(server.as_str()) {
477 if let rsip::Host::IpAddr(server_ip) = &parsed_server.host_with_port.host {
478 if server_ip == callee_ip {
479 if let Some(cred) = &option.credential {
480 info!(
481 callee_ip = %callee_ip,
482 username = cred.username,
483 server = option.server,
484 "Auto-injecting credentials from registered user for outbound call (IP match)"
485 );
486 return Some(cred.clone());
487 }
488 }
489 }
490 }
491 }
492 }
493 None
494 }
495
496 pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
497 for (_, handle) in self.registration_handles.lock().await.iter_mut() {
498 handle.stop();
499 }
500
501 if let Some(duration) = wait_for_clear {
502 let live_users = self.alive_users.clone();
503 let check_loop = async move {
504 loop {
505 let is_empty = {
506 let users = live_users
507 .read()
508 .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
509 users.is_empty()
510 };
511 if is_empty {
512 break;
513 }
514 tokio::time::sleep(Duration::from_millis(50)).await;
515 }
516 Ok::<(), anyhow::Error>(())
517 };
518 match tokio::time::timeout(duration, check_loop).await {
519 Ok(_) => {}
520 Err(e) => {
521 warn!("failed to wait for clear: {}", e);
522 return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
523 }
524 }
525 }
526 Ok(())
527 }
528
529 pub async fn register(&self, option: RegisterOption) -> Result<()> {
530 let user = option.aor();
531 let mut server = option.server.clone();
532 if !server.starts_with("sip:") && !server.starts_with("sips:") {
533 server = format!("sip:{}", server);
534 }
535 let sip_server = match rsip::Uri::try_from(server) {
536 Ok(uri) => uri,
537 Err(e) => {
538 warn!("failed to parse server: {} {:?}", e, option.server);
539 return Err(anyhow::anyhow!("failed to parse server: {}", e));
540 }
541 };
542 let cancel_token = self.token.child_token();
543 let handle = RegistrationHandle {
544 inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
545 endpoint_inner: self.endpoint.inner.clone(),
546 option,
547 cancel_token,
548 start_time: Mutex::new(std::time::Instant::now()),
549 last_update: Mutex::new(std::time::Instant::now()),
550 last_response: Mutex::new(None),
551 }),
552 };
553 self.registration_handles
554 .lock()
555 .await
556 .insert(user.clone(), handle.clone());
557 tracing::debug!(user = user.as_str(), "starting registration task");
558 let alive_users = self.alive_users.clone();
559
560 crate::spawn(async move {
561 *handle.inner.start_time.lock().await = std::time::Instant::now();
562
563 select! {
564 _ = handle.inner.cancel_token.cancelled() => {
565 }
566 _ = async {
567 loop {
568 let user = handle.inner.option.aor();
569 alive_users.write().unwrap().remove(&user);
570 let refresh_time = match handle.do_register(&sip_server, None).await {
571 Ok(expires) => {
572 info!(
573 user = handle.inner.option.aor(),
574 expires = expires,
575 alive_users = alive_users.read().unwrap().len(),
576 "registration refreshed",
577 );
578 alive_users.write().unwrap().insert(user);
579 expires * 3 / 4 }
581 Err(e) => {
582 warn!(
583 user = handle.inner.option.aor(),
584 alive_users = alive_users.read().unwrap().len(),
585 "registration failed: {:?}", e);
586 60
587 }
588 };
589 tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
590 }
591 } => {}
592 }
593 handle.do_register(&sip_server, Some(0)).await.ok();
594 alive_users.write().unwrap().remove(&user);
595 });
596 Ok(())
597 }
598}
599
600impl Drop for AppStateInner {
601 fn drop(&mut self) {
602 self.stop();
603 }
604}
605
606impl AppStateBuilder {
607 pub fn new() -> Self {
608 Self {
609 config: None,
610 stream_engine: None,
611 callrecord_sender: None,
612 callrecord_formatter: None,
613 cancel_token: None,
614 create_invitation_handler: None,
615 config_path: None,
616 message_inspector: None,
617 target_locator: None,
618 transport_inspector: None,
619 }
620 }
621
622 pub fn with_config(mut self, config: Config) -> Self {
623 self.config = Some(config);
624 self
625 }
626
627 pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
628 self.stream_engine = Some(stream_engine);
629 self
630 }
631
632 pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
633 self.callrecord_sender = Some(sender);
634 self
635 }
636
637 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
638 self.cancel_token = Some(token);
639 self
640 }
641
642 pub fn with_config_metadata(mut self, path: Option<String>) -> Self {
643 self.config_path = path;
644 self
645 }
646
647 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
648 self.message_inspector = Some(inspector);
649 self
650 }
651 pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
652 self.target_locator = Some(locator);
653 self
654 }
655
656 pub fn with_transport_inspector(
657 &mut self,
658 inspector: Box<dyn TransportEventInspector>,
659 ) -> &mut Self {
660 self.transport_inspector = Some(inspector);
661 self
662 }
663
664 pub async fn build(self) -> Result<AppState> {
665 let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
666 let token = self
667 .cancel_token
668 .unwrap_or_else(|| CancellationToken::new());
669 let _ = set_cache_dir(&config.media_cache_path);
670
671 let local_ip = if !config.addr.is_empty() {
672 std::net::IpAddr::from_str(config.addr.as_str())?
673 } else {
674 crate::net_tool::get_first_non_loopback_interface()?
675 };
676 let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
677 let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
678
679 let udp_conn = rsipstack::transport::udp::UdpConnection::create_connection(
680 local_addr,
681 None,
682 Some(token.child_token()),
683 )
684 .await
685 .map_err(|e| anyhow::anyhow!("Create useragent UDP connection: {} {}", local_addr, e))?;
686
687 transport_layer.add_transport(udp_conn.into());
688 info!("start useragent, addr: {}", local_addr);
689
690 let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
691 let mut endpoint_builder = rsipstack::EndpointBuilder::new();
692 if let Some(ref user_agent) = config.useragent {
693 endpoint_builder.with_user_agent(user_agent.as_str());
694 }
695
696 let mut endpoint_builder = endpoint_builder
697 .with_cancel_token(token.child_token())
698 .with_transport_layer(transport_layer)
699 .with_option(endpoint_option);
700
701 if let Some(inspector) = self.message_inspector {
702 endpoint_builder = endpoint_builder.with_inspector(inspector);
703 }
704
705 if let Some(locator) = self.target_locator {
706 endpoint_builder.with_target_locator(locator);
707 } else if let Some(ref rules) = config.rewrites {
708 endpoint_builder
709 .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
710 }
711
712 if let Some(inspector) = self.transport_inspector {
713 endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
714 }
715
716 let endpoint = endpoint_builder.build();
717 let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
718
719 let stream_engine = self.stream_engine.unwrap_or_default();
720
721 let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
722 formatter
723 } else {
724 let formatter = if let Some(ref callrecord) = config.callrecord {
725 DefaultCallRecordFormatter::new_with_config(callrecord)
726 } else {
727 DefaultCallRecordFormatter::default()
728 };
729 Arc::new(formatter)
730 };
731
732 let callrecord_sender = if let Some(sender) = self.callrecord_sender {
733 Some(sender)
734 } else if let Some(ref callrecord) = config.callrecord {
735 let builder = CallRecordManagerBuilder::new()
736 .with_cancel_token(token.child_token())
737 .with_config(callrecord.clone())
738 .with_max_concurrent(32)
739 .with_formatter(callrecord_formatter.clone());
740
741 let mut callrecord_manager = builder.build();
742 let sender = callrecord_manager.sender.clone();
743 crate::spawn(async move {
744 callrecord_manager.serve().await;
745 });
746 Some(sender)
747 } else {
748 None
749 };
750
751 let app_state = Arc::new(AppStateInner {
752 config,
753 token,
754 stream_engine,
755 callrecord_sender,
756 endpoint,
757 registration_handles: Mutex::new(HashMap::new()),
758 alive_users: Arc::new(RwLock::new(HashSet::new())),
759 dialog_layer: dialog_layer.clone(),
760 create_invitation_handler: self.create_invitation_handler,
761 invitation: Invitation::new(dialog_layer),
762 routing_state: Arc::new(crate::call::RoutingState::new()),
763 pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
764 active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
765 total_calls: AtomicU64::new(0),
766 total_failed_calls: AtomicU64::new(0),
767 uptime: Local::now(),
768 });
769
770 Ok(app_state)
771 }
772}