1use crate::{
2 call::{ActiveCallRef, sip::Invitation},
3 callrecord::{
4 CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5 },
6 config::Config,
7 locator::RewriteTargetLocator,
8 useragent::{
9 RegisterOption,
10 invitation::FnCreateInvitationHandler,
11 invitation::{PendingDialog, PendingDialogGuard, default_create_invite_handler},
12 registration::RegistrationHandle,
13 },
14};
15
16use crate::media::{cache::set_cache_dir, engine::StreamEngine};
17use anyhow::Result;
18use chrono::{DateTime, Utc};
19use humantime::parse_duration;
20use rsip::prelude::HeadersExt;
21use rsipstack::transaction::{
22 Endpoint, TransactionReceiver,
23 endpoint::{TargetLocator, TransportEventInspector},
24};
25use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
26use std::collections::HashSet;
27use std::str::FromStr;
28use std::sync::{Arc, RwLock};
29use std::time::Duration;
30use std::{collections::HashMap, net::SocketAddr};
31use std::{path::Path, sync::atomic::AtomicU64};
32use tokio::select;
33use tokio::sync::Mutex;
34use tokio_util::sync::CancellationToken;
35use tracing::{info, warn};
36
37pub struct AppStateInner {
38 pub config: Arc<Config>,
39 pub token: CancellationToken,
40 pub stream_engine: Arc<StreamEngine>,
41 pub callrecord_sender: Option<CallRecordSender>,
42 pub endpoint: Endpoint,
43 pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
44 pub alive_users: Arc<RwLock<HashSet<String>>>,
45 pub dialog_layer: Arc<DialogLayer>,
46 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
47 pub invitation: Invitation,
48 pub routing_state: Arc<crate::call::RoutingState>,
49 pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
50
51 pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
52 pub total_calls: AtomicU64,
53 pub total_failed_calls: AtomicU64,
54}
55
56pub type AppState = Arc<AppStateInner>;
57
58pub struct AppStateBuilder {
59 pub config: Option<Config>,
60 pub stream_engine: Option<Arc<StreamEngine>>,
61 pub callrecord_sender: Option<CallRecordSender>,
62 pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
63 pub cancel_token: Option<CancellationToken>,
64 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
65 pub config_loaded_at: Option<DateTime<Utc>>,
66 pub config_path: Option<String>,
67
68 pub message_inspector: Option<Box<dyn MessageInspector>>,
69 pub target_locator: Option<Box<dyn TargetLocator>>,
70 pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
71}
72
73impl AppStateInner {
74 pub fn get_dump_events_file(&self, session_id: &String) -> String {
75 let recorder_root = self.config.recorder_path();
76 let root = Path::new(&recorder_root);
77 if !root.exists() {
78 match std::fs::create_dir_all(root) {
79 Ok(_) => {
80 info!("created dump events root: {}", root.to_string_lossy());
81 }
82 Err(e) => {
83 warn!(
84 "Failed to create dump events root: {} {}",
85 e,
86 root.to_string_lossy()
87 );
88 }
89 }
90 }
91 root.join(format!("{}.events.jsonl", session_id))
92 .to_string_lossy()
93 .to_string()
94 }
95
96 pub fn get_recorder_file(&self, session_id: &String) -> String {
97 let recorder_root = self.config.recorder_path();
98 let root = Path::new(&recorder_root);
99 if !root.exists() {
100 match std::fs::create_dir_all(root) {
101 Ok(_) => {
102 info!("created recorder root: {}", root.to_string_lossy());
103 }
104 Err(e) => {
105 warn!(
106 "Failed to create recorder root: {} {}",
107 e,
108 root.to_string_lossy()
109 );
110 }
111 }
112 }
113 let mut recorder_file = root.join(session_id);
114 let desired_ext = self.config.recorder_format().extension();
115 let has_desired_ext = recorder_file
116 .extension()
117 .and_then(|ext| ext.to_str())
118 .map(|ext| ext.eq_ignore_ascii_case(desired_ext))
119 .unwrap_or(false);
120
121 if !has_desired_ext {
122 recorder_file.set_extension(desired_ext);
124 }
125
126 recorder_file.to_string_lossy().to_string()
127 }
128
129 pub async fn serve(self: Arc<Self>) -> Result<()> {
130 let incoming_txs = self.endpoint.incoming_transactions()?;
131 let token = self.token.child_token();
132 let endpoint_inner = self.endpoint.inner.clone();
133 let dialog_layer = self.dialog_layer.clone();
134
135 match self.start_registration().await {
136 Ok(count) => {
137 info!("registration started, count: {}", count);
138 }
139 Err(e) => {
140 warn!("failed to start registration: {:?}", e);
141 }
142 }
143
144 tokio::select! {
145 _ = token.cancelled() => {
146 info!("cancelled");
147 }
148 result = endpoint_inner.serve() => {
149 if let Err(e) = result {
150 info!("endpoint serve error: {:?}", e);
151 }
152 }
153 result = self.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
154 if let Err(e) = result {
155 info!("process incoming request error: {:?}", e);
156 }
157 },
158 }
159
160 let timeout = self
163 .config
164 .graceful_shutdown
165 .map(|_| Duration::from_secs(50));
166
167 match self.stop_registration(timeout).await {
168 Ok(_) => {
169 info!("registration stopped, waiting for clear");
170 }
171 Err(e) => {
172 warn!("failed to stop registration: {:?}", e);
173 }
174 }
175 info!("stopping");
176 Ok(())
177 }
178
179 async fn process_incoming_request(
180 &self,
181 dialog_layer: Arc<DialogLayer>,
182 mut incoming: TransactionReceiver,
183 ) -> Result<()> {
184 while let Some(mut tx) = incoming.recv().await {
185 let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
186 info!(?key, "received transaction");
187 if tx.original.to_header()?.tag()?.as_ref().is_some() {
188 match dialog_layer.match_dialog(&tx.original) {
189 Some(mut d) => {
190 tokio::spawn(async move {
191 match d.handle(&mut tx).await {
192 Ok(_) => (),
193 Err(e) => {
194 info!("error handling transaction: {:?}", e);
195 }
196 }
197 });
198 continue;
199 }
200 None => {
201 info!("dialog not found: {}", tx.original);
202 match tx
203 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
204 .await
205 {
206 Ok(_) => (),
207 Err(e) => {
208 info!("error replying to request: {:?}", e);
209 }
210 }
211 continue;
212 }
213 }
214 }
215 let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
217 match tx.original.method {
218 rsip::Method::Invite | rsip::Method::Ack => {
219 let invitation_handler = match self.create_invitation_handler {
220 Some(ref create_invitation_handler) => {
221 create_invitation_handler(self.config.handler.as_ref()).ok()
222 }
223 _ => default_create_invite_handler(self.config.handler.as_ref()),
224 };
225 let invitation_handler = match invitation_handler {
226 Some(h) => h,
227 None => {
228 info!(?key, "no invite handler configured, rejecting INVITE");
229 match tx
230 .reply_with(
231 rsip::StatusCode::ServiceUnavailable,
232 vec![rsip::Header::Other(
233 "Reason".into(),
234 "SIP;cause=503;text=\"No invite handler configured\""
235 .into(),
236 )],
237 None,
238 )
239 .await
240 {
241 Ok(_) => (),
242 Err(e) => {
243 info!("error replying to request: {:?}", e);
244 }
245 }
246 continue;
247 }
248 };
249 let contact = dialog_layer
250 .endpoint
251 .get_addrs()
252 .first()
253 .map(|addr| rsip::Uri {
254 scheme: Some(rsip::Scheme::Sip),
255 auth: None,
256 host_with_port: addr.addr.clone(),
257 params: vec![],
258 headers: vec![],
259 });
260
261 let dialog = match dialog_layer.get_or_create_server_invite(
262 &tx,
263 state_sender,
264 None,
265 contact,
266 ) {
267 Ok(d) => d,
268 Err(e) => {
269 info!("failed to obtain dialog: {:?}", e);
271 match tx
272 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
273 .await
274 {
275 Ok(_) => (),
276 Err(e) => {
277 info!("error replying to request: {:?}", e);
278 }
279 }
280 continue;
281 }
282 };
283
284 let dialog_id_str = dialog.id().to_string();
285 let token = self.token.child_token();
286 let pending_dialog = PendingDialog {
287 token: token.clone(),
288 dialog: dialog.clone(),
289 state_receiver,
290 };
291
292 let guard = Arc::new(PendingDialogGuard::new(
293 self.invitation.clone(),
294 dialog_id_str.clone(),
295 pending_dialog,
296 ));
297
298 let accept_timeout = self
299 .config
300 .accept_timeout
301 .as_ref()
302 .and_then(|t| parse_duration(t).ok())
303 .unwrap_or_else(|| Duration::from_secs(60));
304
305 let token_ref = token.clone();
306 let guard_ref = guard.clone();
307 tokio::spawn(async move {
308 select! {
309 _ = token_ref.cancelled() => {}
310 _ = tokio::time::sleep(accept_timeout) => {}
311 }
312 guard_ref.drop_async().await;
313 });
314
315 let mut dialog_ref = dialog.clone();
316 let token_ref = token.clone();
317 let routing_state = self.routing_state.clone();
318 tokio::spawn(async move {
319 let invite_loop = async {
320 match invitation_handler
321 .on_invite(
322 dialog_id_str.clone(),
323 token,
324 dialog.clone(),
325 routing_state,
326 )
327 .await
328 {
329 Ok(_) => (),
330 Err(e) => {
331 info!(id = dialog_id_str, "error handling invite: {:?}", e);
332 guard.drop_async().await;
333 }
334 }
335 };
336 select! {
337 _ = token_ref.cancelled() => {}
338 _ = async {
339 let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
340 } => {}
341 }
342 });
343 }
344 rsip::Method::Options => {
345 info!(?key, "ignoring out-of-dialog OPTIONS request");
346 continue;
347 }
348 _ => {
349 info!(?key, "received request: {:?}", tx.original.method);
350 match tx.reply(rsip::StatusCode::OK).await {
351 Ok(_) => (),
352 Err(e) => {
353 info!("error replying to request: {:?}", e);
354 }
355 }
356 }
357 }
358 }
359 Ok(())
360 }
361
362 pub fn stop(&self) {
363 info!("stopping");
364 self.token.cancel();
365 }
366
367 pub async fn start_registration(&self) -> Result<usize> {
368 let mut count = 0;
369 if let Some(register_users) = &self.config.register_users {
370 for option in register_users.iter() {
371 match self.register(option.clone()).await {
372 Ok(_) => {
373 count += 1;
374 }
375 Err(e) => {
376 warn!("failed to register user: {:?} {:?}", e, option);
377 }
378 }
379 }
380 }
381 Ok(count)
382 }
383
384 pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
385 for (_, handle) in self.registration_handles.lock().await.iter_mut() {
386 handle.stop();
387 }
388
389 if let Some(duration) = wait_for_clear {
390 let live_users = self.alive_users.clone();
391 let check_loop = async move {
392 loop {
393 let is_empty = {
394 let users = live_users
395 .read()
396 .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
397 users.is_empty()
398 };
399 if is_empty {
400 break;
401 }
402 tokio::time::sleep(Duration::from_millis(50)).await;
403 }
404 Ok::<(), anyhow::Error>(())
405 };
406 match tokio::time::timeout(duration, check_loop).await {
407 Ok(_) => {}
408 Err(e) => {
409 warn!("failed to wait for clear: {}", e);
410 return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
411 }
412 }
413 }
414 Ok(())
415 }
416
417 pub async fn register(&self, option: RegisterOption) -> Result<()> {
418 let user = option.aor();
419 let mut server = option.server.clone();
420 if !server.starts_with("sip:") && !server.starts_with("sips:") {
421 server = format!("sip:{}", server);
422 }
423 let sip_server = match rsip::Uri::try_from(server) {
424 Ok(uri) => uri,
425 Err(e) => {
426 warn!("failed to parse server: {} {:?}", e, option.server);
427 return Err(anyhow::anyhow!("failed to parse server: {}", e));
428 }
429 };
430 let cancel_token = self.token.child_token();
431 let handle = RegistrationHandle {
432 inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
433 endpoint_inner: self.endpoint.inner.clone(),
434 option,
435 cancel_token,
436 start_time: Mutex::new(std::time::Instant::now()),
437 last_update: Mutex::new(std::time::Instant::now()),
438 last_response: Mutex::new(None),
439 }),
440 };
441 self.registration_handles
442 .lock()
443 .await
444 .insert(user.clone(), handle.clone());
445 let alive_users = self.alive_users.clone();
446
447 tokio::spawn(async move {
448 *handle.inner.start_time.lock().await = std::time::Instant::now();
449
450 select! {
451 _ = handle.inner.cancel_token.cancelled() => {
452 }
453 _ = async {
454 loop {
455 let user = handle.inner.option.aor();
456 alive_users.write().unwrap().remove(&user);
457 let refresh_time = match handle.do_register(&sip_server, None).await {
458 Ok(expires) => {
459 info!(
460 user = handle.inner.option.aor(),
461 expires = expires,
462 alive_users = alive_users.read().unwrap().len(),
463 "registration refreshed",
464 );
465 alive_users.write().unwrap().insert(user);
466 expires * 3 / 4 }
468 Err(e) => {
469 warn!(
470 user = handle.inner.option.aor(),
471 alive_users = alive_users.read().unwrap().len(),
472 "registration failed: {:?}", e);
473 60
474 }
475 };
476 tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
477 }
478 } => {}
479 }
480 handle.do_register(&sip_server, Some(0)).await.ok();
481 alive_users.write().unwrap().remove(&user);
482 });
483 Ok(())
484 }
485}
486
487impl Drop for AppStateInner {
488 fn drop(&mut self) {
489 self.stop();
490 }
491}
492
493impl AppStateBuilder {
494 pub fn new() -> Self {
495 Self {
496 config: None,
497 stream_engine: None,
498 callrecord_sender: None,
499 callrecord_formatter: None,
500 cancel_token: None,
501 create_invitation_handler: None,
502 config_loaded_at: None,
503 config_path: None,
504 message_inspector: None,
505 target_locator: None,
506 transport_inspector: None,
507 }
508 }
509
510 pub fn with_config(mut self, mut config: Config) -> Self {
511 if config.ensure_recording_defaults() {
512 warn!(
513 "recorder_format=ogg requires compiling with the 'opus' feature; falling back to wav"
514 );
515 }
516 self.config = Some(config);
517 if self.config_loaded_at.is_none() {
518 self.config_loaded_at = Some(Utc::now());
519 }
520 self
521 }
522
523 pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
524 self.stream_engine = Some(stream_engine);
525 self
526 }
527
528 pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
529 self.callrecord_sender = Some(sender);
530 self
531 }
532
533 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
534 self.cancel_token = Some(token);
535 self
536 }
537
538 pub fn with_config_metadata(mut self, path: Option<String>, loaded_at: DateTime<Utc>) -> Self {
539 self.config_path = path;
540 self.config_loaded_at = Some(loaded_at);
541 self
542 }
543
544 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
545 self.message_inspector = Some(inspector);
546 self
547 }
548 pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
549 self.target_locator = Some(locator);
550 self
551 }
552
553 pub fn with_transport_inspector(
554 &mut self,
555 inspector: Box<dyn TransportEventInspector>,
556 ) -> &mut Self {
557 self.transport_inspector = Some(inspector);
558 self
559 }
560
561 pub async fn build(self) -> Result<AppState> {
562 let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
563 let token = self
564 .cancel_token
565 .unwrap_or_else(|| CancellationToken::new());
566 let _ = set_cache_dir(&config.media_cache_path);
567
568 let local_ip = if !config.addr.is_empty() {
569 std::net::IpAddr::from_str(config.addr.as_str())?
570 } else {
571 crate::net_tool::get_first_non_loopback_interface()?
572 };
573 let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
574 let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
575
576 let udp_conn = rsipstack::transport::udp::UdpConnection::create_connection(
577 local_addr,
578 None,
579 Some(token.child_token()),
580 )
581 .await
582 .map_err(|e| anyhow::anyhow!("Create useragent UDP connection: {} {}", local_addr, e))?;
583
584 transport_layer.add_transport(udp_conn.into());
585 info!("start useragent, addr: {}", local_addr);
586
587 let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
588 let mut endpoint_builder = rsipstack::EndpointBuilder::new();
589 if let Some(ref user_agent) = config.useragent {
590 endpoint_builder.with_user_agent(user_agent.as_str());
591 }
592
593 let mut endpoint_builder = endpoint_builder
594 .with_cancel_token(token.child_token())
595 .with_transport_layer(transport_layer)
596 .with_option(endpoint_option);
597
598 if let Some(inspector) = self.message_inspector {
599 endpoint_builder = endpoint_builder.with_inspector(inspector);
600 }
601
602 if let Some(locator) = self.target_locator {
603 endpoint_builder.with_target_locator(locator);
604 } else if let Some(ref rules) = config.rewrites {
605 endpoint_builder
606 .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
607 }
608
609 if let Some(inspector) = self.transport_inspector {
610 endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
611 }
612
613 let endpoint = endpoint_builder.build();
614 let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
615
616 let stream_engine = self.stream_engine.unwrap_or_default();
617
618 let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
619 formatter
620 } else {
621 let formatter = if let Some(ref callrecord) = config.callrecord {
622 DefaultCallRecordFormatter::new_with_config(callrecord)
623 } else {
624 DefaultCallRecordFormatter::default()
625 };
626 Arc::new(formatter)
627 };
628
629 let callrecord_sender = if let Some(sender) = self.callrecord_sender {
630 Some(sender)
631 } else if let Some(ref callrecord) = config.callrecord {
632 let builder = CallRecordManagerBuilder::new()
633 .with_cancel_token(token.child_token())
634 .with_config(callrecord.clone())
635 .with_max_concurrent(32)
636 .with_formatter(callrecord_formatter.clone());
637
638 let mut callrecord_manager = builder.build();
639 let sender = callrecord_manager.sender.clone();
640 tokio::spawn(async move {
641 callrecord_manager.serve().await;
642 });
643 Some(sender)
644 } else {
645 None
646 };
647
648 let app_state = Arc::new(AppStateInner {
649 config,
650 token,
651 stream_engine,
652 callrecord_sender,
653 endpoint,
654 registration_handles: Mutex::new(HashMap::new()),
655 alive_users: Arc::new(RwLock::new(HashSet::new())),
656 dialog_layer: dialog_layer.clone(),
657 create_invitation_handler: self.create_invitation_handler,
658 invitation: Invitation::new(dialog_layer),
659 routing_state: Arc::new(crate::call::RoutingState::new()),
660 pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
661 active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
662 total_calls: AtomicU64::new(0),
663 total_failed_calls: AtomicU64::new(0),
664 });
665
666 Ok(app_state)
667 }
668}