1use crate::{
2 call::{ActiveCallRef, sip::Invitation},
3 callrecord::{
4 CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5 },
6 config::Config,
7 locator::RewriteTargetLocator,
8 useragent::{
9 RegisterOption,
10 invitation::FnCreateInvitationHandler,
11 invitation::{PendingDialog, PendingDialogGuard, default_create_invite_handler},
12 registration::RegistrationHandle,
13 },
14};
15
16use crate::media::{cache::set_cache_dir, engine::StreamEngine};
17use anyhow::Result;
18use chrono::{DateTime, Utc};
19use humantime::parse_duration;
20use rsip::prelude::HeadersExt;
21use rsipstack::transaction::{
22 Endpoint, TransactionReceiver,
23 endpoint::{TargetLocator, TransportEventInspector},
24};
25use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
26use std::collections::HashSet;
27use std::str::FromStr;
28use std::sync::{Arc, RwLock};
29use std::time::Duration;
30use std::{collections::HashMap, net::SocketAddr};
31use std::{path::Path, sync::atomic::AtomicU64};
32use tokio::select;
33use tokio::sync::Mutex;
34use tokio_util::sync::CancellationToken;
35use tracing::{info, warn};
36
37pub struct AppStateInner {
38 pub config: Arc<Config>,
39 pub token: CancellationToken,
40 pub stream_engine: Arc<StreamEngine>,
41 pub callrecord_sender: Option<CallRecordSender>,
42 pub endpoint: Endpoint,
43 pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
44 pub alive_users: Arc<RwLock<HashSet<String>>>,
45 pub dialog_layer: Arc<DialogLayer>,
46 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
47 pub invitation: Invitation,
48 pub routing_state: Arc<crate::call::RoutingState>,
49 pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
50
51 pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
52 pub total_calls: AtomicU64,
53 pub total_failed_calls: AtomicU64,
54}
55
56pub type AppState = Arc<AppStateInner>;
57
58pub struct AppStateBuilder {
59 pub config: Option<Config>,
60 pub stream_engine: Option<Arc<StreamEngine>>,
61 pub callrecord_sender: Option<CallRecordSender>,
62 pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
63 pub cancel_token: Option<CancellationToken>,
64 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
65 pub config_loaded_at: Option<DateTime<Utc>>,
66 pub config_path: Option<String>,
67
68 pub message_inspector: Option<Box<dyn MessageInspector>>,
69 pub target_locator: Option<Box<dyn TargetLocator>>,
70 pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
71}
72
73impl AppStateInner {
74 pub fn get_dump_events_file(&self, session_id: &String) -> String {
75 let recorder_root = self.config.recorder_path();
76 let root = Path::new(&recorder_root);
77 if !root.exists() {
78 match std::fs::create_dir_all(root) {
79 Ok(_) => {
80 info!("created dump events root: {}", root.to_string_lossy());
81 }
82 Err(e) => {
83 warn!(
84 "Failed to create dump events root: {} {}",
85 e,
86 root.to_string_lossy()
87 );
88 }
89 }
90 }
91 root.join(format!("{}.events.jsonl", session_id))
92 .to_string_lossy()
93 .to_string()
94 }
95
96 pub fn get_recorder_file(&self, session_id: &String) -> String {
97 let recorder_root = self.config.recorder_path();
98 let root = Path::new(&recorder_root);
99 if !root.exists() {
100 match std::fs::create_dir_all(root) {
101 Ok(_) => {
102 info!("created recorder root: {}", root.to_string_lossy());
103 }
104 Err(e) => {
105 warn!(
106 "Failed to create recorder root: {} {}",
107 e,
108 root.to_string_lossy()
109 );
110 }
111 }
112 }
113 let desired_ext = self.config.recorder_format().extension();
114 let mut filename = session_id.clone();
115 if !filename
116 .to_lowercase()
117 .ends_with(&format!(".{}", desired_ext.to_lowercase()))
118 {
119 filename = format!("{}.{}", filename, desired_ext);
120 }
121 root.join(filename).to_string_lossy().to_string()
122 }
123
124 pub async fn serve(self: Arc<Self>) -> Result<()> {
125 let incoming_txs = self.endpoint.incoming_transactions()?;
126 let token = self.token.child_token();
127 let endpoint_inner = self.endpoint.inner.clone();
128 let dialog_layer = self.dialog_layer.clone();
129 let app_state_clone = self.clone();
130
131 match self.start_registration().await {
132 Ok(count) => {
133 info!("registration started, count: {}", count);
134 }
135 Err(e) => {
136 warn!("failed to start registration: {:?}", e);
137 }
138 }
139
140 tokio::select! {
141 _ = token.cancelled() => {
142 info!("cancelled");
143 }
144 result = endpoint_inner.serve() => {
145 if let Err(e) = result {
146 info!("endpoint serve error: {:?}", e);
147 }
148 }
149 result = app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
150 if let Err(e) = result {
151 info!("process incoming request error: {:?}", e);
152 }
153 },
154 }
155
156 let timeout = self
159 .config
160 .graceful_shutdown
161 .map(|_| Duration::from_secs(50));
162
163 match self.stop_registration(timeout).await {
164 Ok(_) => {
165 info!("registration stopped, waiting for clear");
166 }
167 Err(e) => {
168 warn!("failed to stop registration: {:?}", e);
169 }
170 }
171 info!("stopping");
172 Ok(())
173 }
174
175 async fn process_incoming_request(
176 self: Arc<Self>,
177 dialog_layer: Arc<DialogLayer>,
178 mut incoming: TransactionReceiver,
179 ) -> Result<()> {
180 while let Some(mut tx) = incoming.recv().await {
181 let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
182 info!(?key, "received transaction");
183 if tx.original.to_header()?.tag()?.as_ref().is_some() {
184 match dialog_layer.match_dialog(&tx.original) {
185 Some(mut d) => {
186 crate::spawn(async move {
187 match d.handle(&mut tx).await {
188 Ok(_) => (),
189 Err(e) => {
190 info!("error handling transaction: {:?}", e);
191 }
192 }
193 });
194 continue;
195 }
196 None => {
197 info!("dialog not found: {}", tx.original);
198 match tx
199 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
200 .await
201 {
202 Ok(_) => (),
203 Err(e) => {
204 info!("error replying to request: {:?}", e);
205 }
206 }
207 continue;
208 }
209 }
210 }
211 let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
213 match tx.original.method {
214 rsip::Method::Invite | rsip::Method::Ack => {
215 let invitation_handler = match self.create_invitation_handler {
216 Some(ref create_invitation_handler) => {
217 create_invitation_handler(self.config.handler.as_ref()).ok()
218 }
219 _ => default_create_invite_handler(
220 self.config.handler.as_ref(),
221 Some(self.clone()),
222 ),
223 };
224 let invitation_handler = match invitation_handler {
225 Some(h) => h,
226 None => {
227 info!(?key, "no invite handler configured, rejecting INVITE");
228 match tx
229 .reply_with(
230 rsip::StatusCode::ServiceUnavailable,
231 vec![rsip::Header::Other(
232 "Reason".into(),
233 "SIP;cause=503;text=\"No invite handler configured\""
234 .into(),
235 )],
236 None,
237 )
238 .await
239 {
240 Ok(_) => (),
241 Err(e) => {
242 info!("error replying to request: {:?}", e);
243 }
244 }
245 continue;
246 }
247 };
248 let contact = dialog_layer
249 .endpoint
250 .get_addrs()
251 .first()
252 .map(|addr| rsip::Uri {
253 scheme: Some(rsip::Scheme::Sip),
254 auth: None,
255 host_with_port: addr.addr.clone(),
256 params: vec![],
257 headers: vec![],
258 });
259
260 let dialog = match dialog_layer.get_or_create_server_invite(
261 &tx,
262 state_sender,
263 None,
264 contact,
265 ) {
266 Ok(d) => d,
267 Err(e) => {
268 info!("failed to obtain dialog: {:?}", e);
270 match tx
271 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
272 .await
273 {
274 Ok(_) => (),
275 Err(e) => {
276 info!("error replying to request: {:?}", e);
277 }
278 }
279 continue;
280 }
281 };
282
283 let dialog_id_str = dialog.id().to_string();
284 let token = self.token.child_token();
285 let pending_dialog = PendingDialog {
286 token: token.clone(),
287 dialog: dialog.clone(),
288 state_receiver,
289 };
290
291 let guard = Arc::new(PendingDialogGuard::new(
292 self.invitation.clone(),
293 dialog_id_str.clone(),
294 pending_dialog,
295 ));
296
297 let accept_timeout = self
298 .config
299 .accept_timeout
300 .as_ref()
301 .and_then(|t| parse_duration(t).ok())
302 .unwrap_or_else(|| Duration::from_secs(60));
303
304 let token_ref = token.clone();
305 let guard_ref = guard.clone();
306 crate::spawn(async move {
307 select! {
308 _ = token_ref.cancelled() => {}
309 _ = tokio::time::sleep(accept_timeout) => {}
310 }
311 guard_ref.drop_async().await;
312 });
313
314 let mut dialog_ref = dialog.clone();
315 let token_ref = token.clone();
316 let routing_state = self.routing_state.clone();
317 crate::spawn(async move {
318 let invite_loop = async {
319 match invitation_handler
320 .on_invite(
321 dialog_id_str.clone(),
322 token,
323 dialog.clone(),
324 routing_state,
325 )
326 .await
327 {
328 Ok(_) => (),
329 Err(e) => {
330 info!(id = dialog_id_str, "error handling invite: {:?}", e);
331 guard.drop_async().await;
332 }
333 }
334 };
335 select! {
336 _ = token_ref.cancelled() => {}
337 _ = async {
338 let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
339 } => {}
340 }
341 });
342 }
343 rsip::Method::Options => {
344 info!(?key, "ignoring out-of-dialog OPTIONS request");
345 continue;
346 }
347 _ => {
348 info!(?key, "received request: {:?}", tx.original.method);
349 match tx.reply(rsip::StatusCode::OK).await {
350 Ok(_) => (),
351 Err(e) => {
352 info!("error replying to request: {:?}", e);
353 }
354 }
355 }
356 }
357 }
358 Ok(())
359 }
360
361 pub fn stop(&self) {
362 info!("stopping");
363 self.token.cancel();
364 }
365
366 pub async fn start_registration(&self) -> Result<usize> {
367 let mut count = 0;
368 if let Some(register_users) = &self.config.register_users {
369 for option in register_users.iter() {
370 match self.register(option.clone()).await {
371 Ok(_) => {
372 count += 1;
373 }
374 Err(e) => {
375 warn!("failed to register user: {:?} {:?}", e, option);
376 }
377 }
378 }
379 }
380 Ok(count)
381 }
382
383 pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
384 for (_, handle) in self.registration_handles.lock().await.iter_mut() {
385 handle.stop();
386 }
387
388 if let Some(duration) = wait_for_clear {
389 let live_users = self.alive_users.clone();
390 let check_loop = async move {
391 loop {
392 let is_empty = {
393 let users = live_users
394 .read()
395 .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
396 users.is_empty()
397 };
398 if is_empty {
399 break;
400 }
401 tokio::time::sleep(Duration::from_millis(50)).await;
402 }
403 Ok::<(), anyhow::Error>(())
404 };
405 match tokio::time::timeout(duration, check_loop).await {
406 Ok(_) => {}
407 Err(e) => {
408 warn!("failed to wait for clear: {}", e);
409 return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
410 }
411 }
412 }
413 Ok(())
414 }
415
416 pub async fn register(&self, option: RegisterOption) -> Result<()> {
417 let user = option.aor();
418 let mut server = option.server.clone();
419 if !server.starts_with("sip:") && !server.starts_with("sips:") {
420 server = format!("sip:{}", server);
421 }
422 let sip_server = match rsip::Uri::try_from(server) {
423 Ok(uri) => uri,
424 Err(e) => {
425 warn!("failed to parse server: {} {:?}", e, option.server);
426 return Err(anyhow::anyhow!("failed to parse server: {}", e));
427 }
428 };
429 let cancel_token = self.token.child_token();
430 let handle = RegistrationHandle {
431 inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
432 endpoint_inner: self.endpoint.inner.clone(),
433 option,
434 cancel_token,
435 start_time: Mutex::new(std::time::Instant::now()),
436 last_update: Mutex::new(std::time::Instant::now()),
437 last_response: Mutex::new(None),
438 }),
439 };
440 self.registration_handles
441 .lock()
442 .await
443 .insert(user.clone(), handle.clone());
444 let alive_users = self.alive_users.clone();
445
446 crate::spawn(async move {
447 *handle.inner.start_time.lock().await = std::time::Instant::now();
448
449 select! {
450 _ = handle.inner.cancel_token.cancelled() => {
451 }
452 _ = async {
453 loop {
454 let user = handle.inner.option.aor();
455 alive_users.write().unwrap().remove(&user);
456 let refresh_time = match handle.do_register(&sip_server, None).await {
457 Ok(expires) => {
458 info!(
459 user = handle.inner.option.aor(),
460 expires = expires,
461 alive_users = alive_users.read().unwrap().len(),
462 "registration refreshed",
463 );
464 alive_users.write().unwrap().insert(user);
465 expires * 3 / 4 }
467 Err(e) => {
468 warn!(
469 user = handle.inner.option.aor(),
470 alive_users = alive_users.read().unwrap().len(),
471 "registration failed: {:?}", e);
472 60
473 }
474 };
475 tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
476 }
477 } => {}
478 }
479 handle.do_register(&sip_server, Some(0)).await.ok();
480 alive_users.write().unwrap().remove(&user);
481 });
482 Ok(())
483 }
484}
485
486impl Drop for AppStateInner {
487 fn drop(&mut self) {
488 self.stop();
489 }
490}
491
492impl AppStateBuilder {
493 pub fn new() -> Self {
494 Self {
495 config: None,
496 stream_engine: None,
497 callrecord_sender: None,
498 callrecord_formatter: None,
499 cancel_token: None,
500 create_invitation_handler: None,
501 config_loaded_at: None,
502 config_path: None,
503 message_inspector: None,
504 target_locator: None,
505 transport_inspector: None,
506 }
507 }
508
509 pub fn with_config(mut self, config: Config) -> Self {
510 self.config = Some(config);
511 if self.config_loaded_at.is_none() {
512 self.config_loaded_at = Some(Utc::now());
513 }
514 self
515 }
516
517 pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
518 self.stream_engine = Some(stream_engine);
519 self
520 }
521
522 pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
523 self.callrecord_sender = Some(sender);
524 self
525 }
526
527 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
528 self.cancel_token = Some(token);
529 self
530 }
531
532 pub fn with_config_metadata(mut self, path: Option<String>, loaded_at: DateTime<Utc>) -> Self {
533 self.config_path = path;
534 self.config_loaded_at = Some(loaded_at);
535 self
536 }
537
538 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
539 self.message_inspector = Some(inspector);
540 self
541 }
542 pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
543 self.target_locator = Some(locator);
544 self
545 }
546
547 pub fn with_transport_inspector(
548 &mut self,
549 inspector: Box<dyn TransportEventInspector>,
550 ) -> &mut Self {
551 self.transport_inspector = Some(inspector);
552 self
553 }
554
555 pub async fn build(self) -> Result<AppState> {
556 let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
557 let token = self
558 .cancel_token
559 .unwrap_or_else(|| CancellationToken::new());
560 let _ = set_cache_dir(&config.media_cache_path);
561
562 let local_ip = if !config.addr.is_empty() {
563 std::net::IpAddr::from_str(config.addr.as_str())?
564 } else {
565 crate::net_tool::get_first_non_loopback_interface()?
566 };
567 let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
568 let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
569
570 let udp_conn = rsipstack::transport::udp::UdpConnection::create_connection(
571 local_addr,
572 None,
573 Some(token.child_token()),
574 )
575 .await
576 .map_err(|e| anyhow::anyhow!("Create useragent UDP connection: {} {}", local_addr, e))?;
577
578 transport_layer.add_transport(udp_conn.into());
579 info!("start useragent, addr: {}", local_addr);
580
581 let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
582 let mut endpoint_builder = rsipstack::EndpointBuilder::new();
583 if let Some(ref user_agent) = config.useragent {
584 endpoint_builder.with_user_agent(user_agent.as_str());
585 }
586
587 let mut endpoint_builder = endpoint_builder
588 .with_cancel_token(token.child_token())
589 .with_transport_layer(transport_layer)
590 .with_option(endpoint_option);
591
592 if let Some(inspector) = self.message_inspector {
593 endpoint_builder = endpoint_builder.with_inspector(inspector);
594 }
595
596 if let Some(locator) = self.target_locator {
597 endpoint_builder.with_target_locator(locator);
598 } else if let Some(ref rules) = config.rewrites {
599 endpoint_builder
600 .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
601 }
602
603 if let Some(inspector) = self.transport_inspector {
604 endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
605 }
606
607 let endpoint = endpoint_builder.build();
608 let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
609
610 let stream_engine = self.stream_engine.unwrap_or_default();
611
612 let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
613 formatter
614 } else {
615 let formatter = if let Some(ref callrecord) = config.callrecord {
616 DefaultCallRecordFormatter::new_with_config(callrecord)
617 } else {
618 DefaultCallRecordFormatter::default()
619 };
620 Arc::new(formatter)
621 };
622
623 let callrecord_sender = if let Some(sender) = self.callrecord_sender {
624 Some(sender)
625 } else if let Some(ref callrecord) = config.callrecord {
626 let builder = CallRecordManagerBuilder::new()
627 .with_cancel_token(token.child_token())
628 .with_config(callrecord.clone())
629 .with_max_concurrent(32)
630 .with_formatter(callrecord_formatter.clone());
631
632 let mut callrecord_manager = builder.build();
633 let sender = callrecord_manager.sender.clone();
634 crate::spawn(async move {
635 callrecord_manager.serve().await;
636 });
637 Some(sender)
638 } else {
639 None
640 };
641
642 let app_state = Arc::new(AppStateInner {
643 config,
644 token,
645 stream_engine,
646 callrecord_sender,
647 endpoint,
648 registration_handles: Mutex::new(HashMap::new()),
649 alive_users: Arc::new(RwLock::new(HashSet::new())),
650 dialog_layer: dialog_layer.clone(),
651 create_invitation_handler: self.create_invitation_handler,
652 invitation: Invitation::new(dialog_layer),
653 routing_state: Arc::new(crate::call::RoutingState::new()),
654 pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
655 active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
656 total_calls: AtomicU64::new(0),
657 total_failed_calls: AtomicU64::new(0),
658 });
659
660 Ok(app_state)
661 }
662}