1use crate::{
2 call::{ActiveCallRef, sip::Invitation},
3 callrecord::{
4 CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5 },
6 config::Config,
7 locator::RewriteTargetLocator,
8 useragent::{
9 RegisterOption,
10 invitation::FnCreateInvitationHandler,
11 invitation::{PendingDialog, PendingDialogGuard, default_create_invite_handler},
12 registration::RegistrationHandle,
13 },
14};
15
16use crate::media::{cache::set_cache_dir, engine::StreamEngine};
17use anyhow::Result;
18use chrono::{DateTime, Local};
19use humantime::parse_duration;
20use rsip::prelude::HeadersExt;
21use rsipstack::transaction::{
22 Endpoint, TransactionReceiver,
23 endpoint::{TargetLocator, TransportEventInspector},
24};
25use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
26use std::collections::HashSet;
27use std::str::FromStr;
28use std::sync::{Arc, RwLock};
29use std::time::Duration;
30use std::{collections::HashMap, net::SocketAddr};
31use std::{path::Path, sync::atomic::AtomicU64};
32use tokio::select;
33use tokio::sync::Mutex;
34use tokio_util::sync::CancellationToken;
35use tracing::{info, warn};
36
37pub struct AppStateInner {
38 pub config: Arc<Config>,
39 pub token: CancellationToken,
40 pub stream_engine: Arc<StreamEngine>,
41 pub callrecord_sender: Option<CallRecordSender>,
42 pub endpoint: Endpoint,
43 pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
44 pub alive_users: Arc<RwLock<HashSet<String>>>,
45 pub dialog_layer: Arc<DialogLayer>,
46 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
47 pub invitation: Invitation,
48 pub routing_state: Arc<crate::call::RoutingState>,
49 pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
50
51 pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
52 pub total_calls: AtomicU64,
53 pub total_failed_calls: AtomicU64,
54 pub uptime: DateTime<Local>,
55}
56
57pub type AppState = Arc<AppStateInner>;
58
59pub struct AppStateBuilder {
60 pub config: Option<Config>,
61 pub stream_engine: Option<Arc<StreamEngine>>,
62 pub callrecord_sender: Option<CallRecordSender>,
63 pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
64 pub cancel_token: Option<CancellationToken>,
65 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
66 pub config_path: Option<String>,
67
68 pub message_inspector: Option<Box<dyn MessageInspector>>,
69 pub target_locator: Option<Box<dyn TargetLocator>>,
70 pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
71}
72
73impl AppStateInner {
74 pub fn get_dump_events_file(&self, session_id: &String) -> String {
75 let recorder_root = self.config.recorder_path();
76 let root = Path::new(&recorder_root);
77 if !root.exists() {
78 match std::fs::create_dir_all(root) {
79 Ok(_) => {
80 info!("created dump events root: {}", root.to_string_lossy());
81 }
82 Err(e) => {
83 warn!(
84 "Failed to create dump events root: {} {}",
85 e,
86 root.to_string_lossy()
87 );
88 }
89 }
90 }
91 root.join(format!("{}.events.jsonl", session_id))
92 .to_string_lossy()
93 .to_string()
94 }
95
96 pub fn get_recorder_file(&self, session_id: &String) -> String {
97 let recorder_root = self.config.recorder_path();
98 let root = Path::new(&recorder_root);
99 if !root.exists() {
100 match std::fs::create_dir_all(root) {
101 Ok(_) => {
102 info!("created recorder root: {}", root.to_string_lossy());
103 }
104 Err(e) => {
105 warn!(
106 "Failed to create recorder root: {} {}",
107 e,
108 root.to_string_lossy()
109 );
110 }
111 }
112 }
113 let desired_ext = self.config.recorder_format().extension();
114 let mut filename = session_id.clone();
115 if !filename
116 .to_lowercase()
117 .ends_with(&format!(".{}", desired_ext.to_lowercase()))
118 {
119 filename = format!("{}.{}", filename, desired_ext);
120 }
121 root.join(filename).to_string_lossy().to_string()
122 }
123
124 pub async fn serve(self: Arc<Self>) -> Result<()> {
125 let incoming_txs = self.endpoint.incoming_transactions()?;
126 let token = self.token.child_token();
127 let endpoint_inner = self.endpoint.inner.clone();
128 let dialog_layer = self.dialog_layer.clone();
129 let app_state_clone = self.clone();
130
131 match self.start_registration().await {
132 Ok(count) => {
133 info!("registration started, count: {}", count);
134 }
135 Err(e) => {
136 warn!("failed to start registration: {:?}", e);
137 }
138 }
139
140 tokio::select! {
141 _ = token.cancelled() => {
142 info!("cancelled");
143 }
144 result = endpoint_inner.serve() => {
145 if let Err(e) = result {
146 info!("endpoint serve error: {:?}", e);
147 }
148 }
149 result = app_state_clone.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
150 if let Err(e) = result {
151 info!("process incoming request error: {:?}", e);
152 }
153 },
154 }
155
156 let timeout = self
159 .config
160 .graceful_shutdown
161 .map(|_| Duration::from_secs(50));
162
163 match self.stop_registration(timeout).await {
164 Ok(_) => {
165 info!("registration stopped, waiting for clear");
166 }
167 Err(e) => {
168 warn!("failed to stop registration: {:?}", e);
169 }
170 }
171 info!("stopping");
172 Ok(())
173 }
174
175 async fn process_incoming_request(
176 self: Arc<Self>,
177 dialog_layer: Arc<DialogLayer>,
178 mut incoming: TransactionReceiver,
179 ) -> Result<()> {
180 while let Some(mut tx) = incoming.recv().await {
181 let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
182 info!(?key, "received transaction");
183 if tx.original.to_header()?.tag()?.as_ref().is_some() {
184 match dialog_layer.match_dialog(&tx.original) {
185 Some(mut d) => {
186 crate::spawn(async move {
187 match d.handle(&mut tx).await {
188 Ok(_) => (),
189 Err(e) => {
190 info!("error handling transaction: {:?}", e);
191 }
192 }
193 });
194 continue;
195 }
196 None => {
197 info!("dialog not found: {}", tx.original);
198 match tx
199 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
200 .await
201 {
202 Ok(_) => (),
203 Err(e) => {
204 info!("error replying to request: {:?}", e);
205 }
206 }
207 continue;
208 }
209 }
210 }
211 let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
213 match tx.original.method {
214 rsip::Method::Invite | rsip::Method::Ack => {
215 let invitation_handler = match self.create_invitation_handler {
216 Some(ref create_invitation_handler) => {
217 create_invitation_handler(self.config.handler.as_ref()).ok()
218 }
219 _ => default_create_invite_handler(
220 self.config.handler.as_ref(),
221 Some(self.clone()),
222 ),
223 };
224 let invitation_handler = match invitation_handler {
225 Some(h) => h,
226 None => {
227 info!(?key, "no invite handler configured, rejecting INVITE");
228 match tx
229 .reply_with(
230 rsip::StatusCode::ServiceUnavailable,
231 vec![rsip::Header::Other(
232 "Reason".into(),
233 "SIP;cause=503;text=\"No invite handler configured\""
234 .into(),
235 )],
236 None,
237 )
238 .await
239 {
240 Ok(_) => (),
241 Err(e) => {
242 info!("error replying to request: {:?}", e);
243 }
244 }
245 continue;
246 }
247 };
248 let contact = dialog_layer
249 .endpoint
250 .get_addrs()
251 .first()
252 .map(|addr| rsip::Uri {
253 scheme: Some(rsip::Scheme::Sip),
254 auth: None,
255 host_with_port: addr.addr.clone(),
256 params: vec![],
257 headers: vec![],
258 });
259
260 let dialog = match dialog_layer.get_or_create_server_invite(
261 &tx,
262 state_sender,
263 None,
264 contact,
265 ) {
266 Ok(d) => d,
267 Err(e) => {
268 info!("failed to obtain dialog: {:?}", e);
270 match tx
271 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
272 .await
273 {
274 Ok(_) => (),
275 Err(e) => {
276 info!("error replying to request: {:?}", e);
277 }
278 }
279 continue;
280 }
281 };
282
283 let dialog_id = dialog.id();
284 let dialog_id_str = dialog_id.to_string();
285 let token = self.token.child_token();
286 let pending_dialog = PendingDialog {
287 token: token.clone(),
288 dialog: dialog.clone(),
289 state_receiver,
290 };
291
292 let guard = Arc::new(PendingDialogGuard::new(
293 self.invitation.clone(),
294 dialog_id,
295 pending_dialog,
296 ));
297
298 let accept_timeout = self
299 .config
300 .accept_timeout
301 .as_ref()
302 .and_then(|t| parse_duration(t).ok())
303 .unwrap_or_else(|| Duration::from_secs(60));
304
305 let token_ref = token.clone();
306 let guard_ref = guard.clone();
307 crate::spawn(async move {
308 select! {
309 _ = token_ref.cancelled() => {}
310 _ = tokio::time::sleep(accept_timeout) => {}
311 }
312 guard_ref.drop_async().await;
313 });
314
315 let mut dialog_ref = dialog.clone();
316 let token_ref = token.clone();
317 let routing_state = self.routing_state.clone();
318 crate::spawn(async move {
319 let invite_loop = async {
320 match invitation_handler
321 .on_invite(
322 dialog_id_str.clone(),
323 token,
324 dialog.clone(),
325 routing_state,
326 )
327 .await
328 {
329 Ok(_) => (),
330 Err(e) => {
331 info!(id = dialog_id_str, "error handling invite: {:?}", e);
332 guard.drop_async().await;
333 }
334 }
335 };
336 select! {
337 _ = token_ref.cancelled() => {}
338 _ = async {
339 let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
340 } => {}
341 }
342 });
343 }
344 rsip::Method::Options => {
345 info!(?key, "ignoring out-of-dialog OPTIONS request");
346 continue;
347 }
348 _ => {
349 info!(?key, "received request: {:?}", tx.original.method);
350 match tx.reply(rsip::StatusCode::OK).await {
351 Ok(_) => (),
352 Err(e) => {
353 info!("error replying to request: {:?}", e);
354 }
355 }
356 }
357 }
358 }
359 Ok(())
360 }
361
362 pub fn stop(&self) {
363 info!("stopping");
364 self.token.cancel();
365 }
366
367 pub async fn start_registration(&self) -> Result<usize> {
368 let mut count = 0;
369 if let Some(register_users) = &self.config.register_users {
370 for option in register_users.iter() {
371 match self.register(option.clone()).await {
372 Ok(_) => {
373 count += 1;
374 }
375 Err(e) => {
376 warn!("failed to register user: {:?} {:?}", e, option);
377 }
378 }
379 }
380 }
381 Ok(count)
382 }
383
384 pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
385 for (_, handle) in self.registration_handles.lock().await.iter_mut() {
386 handle.stop();
387 }
388
389 if let Some(duration) = wait_for_clear {
390 let live_users = self.alive_users.clone();
391 let check_loop = async move {
392 loop {
393 let is_empty = {
394 let users = live_users
395 .read()
396 .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
397 users.is_empty()
398 };
399 if is_empty {
400 break;
401 }
402 tokio::time::sleep(Duration::from_millis(50)).await;
403 }
404 Ok::<(), anyhow::Error>(())
405 };
406 match tokio::time::timeout(duration, check_loop).await {
407 Ok(_) => {}
408 Err(e) => {
409 warn!("failed to wait for clear: {}", e);
410 return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
411 }
412 }
413 }
414 Ok(())
415 }
416
417 pub async fn register(&self, option: RegisterOption) -> Result<()> {
418 let user = option.aor();
419 let mut server = option.server.clone();
420 if !server.starts_with("sip:") && !server.starts_with("sips:") {
421 server = format!("sip:{}", server);
422 }
423 let sip_server = match rsip::Uri::try_from(server) {
424 Ok(uri) => uri,
425 Err(e) => {
426 warn!("failed to parse server: {} {:?}", e, option.server);
427 return Err(anyhow::anyhow!("failed to parse server: {}", e));
428 }
429 };
430 let cancel_token = self.token.child_token();
431 let handle = RegistrationHandle {
432 inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
433 endpoint_inner: self.endpoint.inner.clone(),
434 option,
435 cancel_token,
436 start_time: Mutex::new(std::time::Instant::now()),
437 last_update: Mutex::new(std::time::Instant::now()),
438 last_response: Mutex::new(None),
439 }),
440 };
441 self.registration_handles
442 .lock()
443 .await
444 .insert(user.clone(), handle.clone());
445 tracing::debug!(user = user.as_str(), "starting registration task");
446 let alive_users = self.alive_users.clone();
447
448 crate::spawn(async move {
449 *handle.inner.start_time.lock().await = std::time::Instant::now();
450
451 select! {
452 _ = handle.inner.cancel_token.cancelled() => {
453 }
454 _ = async {
455 loop {
456 let user = handle.inner.option.aor();
457 alive_users.write().unwrap().remove(&user);
458 let refresh_time = match handle.do_register(&sip_server, None).await {
459 Ok(expires) => {
460 info!(
461 user = handle.inner.option.aor(),
462 expires = expires,
463 alive_users = alive_users.read().unwrap().len(),
464 "registration refreshed",
465 );
466 alive_users.write().unwrap().insert(user);
467 expires * 3 / 4 }
469 Err(e) => {
470 warn!(
471 user = handle.inner.option.aor(),
472 alive_users = alive_users.read().unwrap().len(),
473 "registration failed: {:?}", e);
474 60
475 }
476 };
477 tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
478 }
479 } => {}
480 }
481 handle.do_register(&sip_server, Some(0)).await.ok();
482 alive_users.write().unwrap().remove(&user);
483 });
484 Ok(())
485 }
486}
487
488impl Drop for AppStateInner {
489 fn drop(&mut self) {
490 self.stop();
491 }
492}
493
494impl AppStateBuilder {
495 pub fn new() -> Self {
496 Self {
497 config: None,
498 stream_engine: None,
499 callrecord_sender: None,
500 callrecord_formatter: None,
501 cancel_token: None,
502 create_invitation_handler: None,
503 config_path: None,
504 message_inspector: None,
505 target_locator: None,
506 transport_inspector: None,
507 }
508 }
509
510 pub fn with_config(mut self, config: Config) -> Self {
511 self.config = Some(config);
512 self
513 }
514
515 pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
516 self.stream_engine = Some(stream_engine);
517 self
518 }
519
520 pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
521 self.callrecord_sender = Some(sender);
522 self
523 }
524
525 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
526 self.cancel_token = Some(token);
527 self
528 }
529
530 pub fn with_config_metadata(mut self, path: Option<String>) -> Self {
531 self.config_path = path;
532 self
533 }
534
535 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
536 self.message_inspector = Some(inspector);
537 self
538 }
539 pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
540 self.target_locator = Some(locator);
541 self
542 }
543
544 pub fn with_transport_inspector(
545 &mut self,
546 inspector: Box<dyn TransportEventInspector>,
547 ) -> &mut Self {
548 self.transport_inspector = Some(inspector);
549 self
550 }
551
552 pub async fn build(self) -> Result<AppState> {
553 let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
554 let token = self
555 .cancel_token
556 .unwrap_or_else(|| CancellationToken::new());
557 let _ = set_cache_dir(&config.media_cache_path);
558
559 let local_ip = if !config.addr.is_empty() {
560 std::net::IpAddr::from_str(config.addr.as_str())?
561 } else {
562 crate::net_tool::get_first_non_loopback_interface()?
563 };
564 let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
565 let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
566
567 let udp_conn = rsipstack::transport::udp::UdpConnection::create_connection(
568 local_addr,
569 None,
570 Some(token.child_token()),
571 )
572 .await
573 .map_err(|e| anyhow::anyhow!("Create useragent UDP connection: {} {}", local_addr, e))?;
574
575 transport_layer.add_transport(udp_conn.into());
576 info!("start useragent, addr: {}", local_addr);
577
578 let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
579 let mut endpoint_builder = rsipstack::EndpointBuilder::new();
580 if let Some(ref user_agent) = config.useragent {
581 endpoint_builder.with_user_agent(user_agent.as_str());
582 }
583
584 let mut endpoint_builder = endpoint_builder
585 .with_cancel_token(token.child_token())
586 .with_transport_layer(transport_layer)
587 .with_option(endpoint_option);
588
589 if let Some(inspector) = self.message_inspector {
590 endpoint_builder = endpoint_builder.with_inspector(inspector);
591 }
592
593 if let Some(locator) = self.target_locator {
594 endpoint_builder.with_target_locator(locator);
595 } else if let Some(ref rules) = config.rewrites {
596 endpoint_builder
597 .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
598 }
599
600 if let Some(inspector) = self.transport_inspector {
601 endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
602 }
603
604 let endpoint = endpoint_builder.build();
605 let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
606
607 let stream_engine = self.stream_engine.unwrap_or_default();
608
609 let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
610 formatter
611 } else {
612 let formatter = if let Some(ref callrecord) = config.callrecord {
613 DefaultCallRecordFormatter::new_with_config(callrecord)
614 } else {
615 DefaultCallRecordFormatter::default()
616 };
617 Arc::new(formatter)
618 };
619
620 let callrecord_sender = if let Some(sender) = self.callrecord_sender {
621 Some(sender)
622 } else if let Some(ref callrecord) = config.callrecord {
623 let builder = CallRecordManagerBuilder::new()
624 .with_cancel_token(token.child_token())
625 .with_config(callrecord.clone())
626 .with_max_concurrent(32)
627 .with_formatter(callrecord_formatter.clone());
628
629 let mut callrecord_manager = builder.build();
630 let sender = callrecord_manager.sender.clone();
631 crate::spawn(async move {
632 callrecord_manager.serve().await;
633 });
634 Some(sender)
635 } else {
636 None
637 };
638
639 let app_state = Arc::new(AppStateInner {
640 config,
641 token,
642 stream_engine,
643 callrecord_sender,
644 endpoint,
645 registration_handles: Mutex::new(HashMap::new()),
646 alive_users: Arc::new(RwLock::new(HashSet::new())),
647 dialog_layer: dialog_layer.clone(),
648 create_invitation_handler: self.create_invitation_handler,
649 invitation: Invitation::new(dialog_layer),
650 routing_state: Arc::new(crate::call::RoutingState::new()),
651 pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
652 active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
653 total_calls: AtomicU64::new(0),
654 total_failed_calls: AtomicU64::new(0),
655 uptime: Local::now(),
656 });
657
658 Ok(app_state)
659 }
660}