1use crate::{
2 call::{ActiveCallRef, sip::Invitation},
3 callrecord::{
4 CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5 },
6 config::Config,
7 locator::RewriteTargetLocator,
8 useragent::{
9 RegisterOption,
10 invitation::FnCreateInvitationHandler,
11 invitation::{PendingDialog, PendingDialogGuard, default_create_invite_handler},
12 registration::RegistrationHandle,
13 },
14};
15
16use crate::media::{cache::set_cache_dir, engine::StreamEngine};
17use anyhow::Result;
18use chrono::{DateTime, Utc};
19use humantime::parse_duration;
20use rsip::prelude::HeadersExt;
21use rsipstack::transaction::{
22 Endpoint, TransactionReceiver,
23 endpoint::{TargetLocator, TransportEventInspector},
24};
25use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
26use std::collections::HashSet;
27use std::str::FromStr;
28use std::sync::{Arc, RwLock};
29use std::time::Duration;
30use std::{collections::HashMap, net::SocketAddr};
31use std::{path::Path, sync::atomic::AtomicU64};
32use tokio::select;
33use tokio::sync::Mutex;
34use tokio_util::sync::CancellationToken;
35use tracing::{info, warn};
36
37pub struct AppStateInner {
38 pub config: Arc<Config>,
39 pub token: CancellationToken,
40 pub stream_engine: Arc<StreamEngine>,
41 pub callrecord_sender: Option<CallRecordSender>,
42 pub endpoint: Endpoint,
43 pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
44 pub alive_users: Arc<RwLock<HashSet<String>>>,
45 pub dialog_layer: Arc<DialogLayer>,
46 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
47 pub invitation: Invitation,
48 pub routing_state: Arc<crate::call::RoutingState>,
49 pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
50
51 pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
52 pub total_calls: AtomicU64,
53 pub total_failed_calls: AtomicU64,
54}
55
56pub type AppState = Arc<AppStateInner>;
57
58pub struct AppStateBuilder {
59 pub config: Option<Config>,
60 pub stream_engine: Option<Arc<StreamEngine>>,
61 pub callrecord_sender: Option<CallRecordSender>,
62 pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
63 pub cancel_token: Option<CancellationToken>,
64 pub create_invitation_handler: Option<FnCreateInvitationHandler>,
65 pub config_loaded_at: Option<DateTime<Utc>>,
66 pub config_path: Option<String>,
67
68 pub message_inspector: Option<Box<dyn MessageInspector>>,
69 pub target_locator: Option<Box<dyn TargetLocator>>,
70 pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
71}
72
73impl AppStateInner {
74 pub fn get_dump_events_file(&self, session_id: &String) -> String {
75 let recorder_root = self.config.recorder_path();
76 let root = Path::new(&recorder_root);
77 if !root.exists() {
78 match std::fs::create_dir_all(root) {
79 Ok(_) => {
80 info!("created dump events root: {}", root.to_string_lossy());
81 }
82 Err(e) => {
83 warn!(
84 "Failed to create dump events root: {} {}",
85 e,
86 root.to_string_lossy()
87 );
88 }
89 }
90 }
91 root.join(format!("{}.events.jsonl", session_id))
92 .to_string_lossy()
93 .to_string()
94 }
95
96 pub fn get_recorder_file(&self, session_id: &String) -> String {
97 let recorder_root = self.config.recorder_path();
98 let root = Path::new(&recorder_root);
99 if !root.exists() {
100 match std::fs::create_dir_all(root) {
101 Ok(_) => {
102 info!("created recorder root: {}", root.to_string_lossy());
103 }
104 Err(e) => {
105 warn!(
106 "Failed to create recorder root: {} {}",
107 e,
108 root.to_string_lossy()
109 );
110 }
111 }
112 }
113 let desired_ext = self.config.recorder_format().extension();
114 let mut filename = session_id.clone();
115 if !filename
116 .to_lowercase()
117 .ends_with(&format!(".{}", desired_ext.to_lowercase()))
118 {
119 filename = format!("{}.{}", filename, desired_ext);
120 }
121 root.join(filename).to_string_lossy().to_string()
122 }
123
124 pub async fn serve(self: Arc<Self>) -> Result<()> {
125 let incoming_txs = self.endpoint.incoming_transactions()?;
126 let token = self.token.child_token();
127 let endpoint_inner = self.endpoint.inner.clone();
128 let dialog_layer = self.dialog_layer.clone();
129
130 match self.start_registration().await {
131 Ok(count) => {
132 info!("registration started, count: {}", count);
133 }
134 Err(e) => {
135 warn!("failed to start registration: {:?}", e);
136 }
137 }
138
139 tokio::select! {
140 _ = token.cancelled() => {
141 info!("cancelled");
142 }
143 result = endpoint_inner.serve() => {
144 if let Err(e) = result {
145 info!("endpoint serve error: {:?}", e);
146 }
147 }
148 result = self.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
149 if let Err(e) = result {
150 info!("process incoming request error: {:?}", e);
151 }
152 },
153 }
154
155 let timeout = self
158 .config
159 .graceful_shutdown
160 .map(|_| Duration::from_secs(50));
161
162 match self.stop_registration(timeout).await {
163 Ok(_) => {
164 info!("registration stopped, waiting for clear");
165 }
166 Err(e) => {
167 warn!("failed to stop registration: {:?}", e);
168 }
169 }
170 info!("stopping");
171 Ok(())
172 }
173
174 async fn process_incoming_request(
175 &self,
176 dialog_layer: Arc<DialogLayer>,
177 mut incoming: TransactionReceiver,
178 ) -> Result<()> {
179 while let Some(mut tx) = incoming.recv().await {
180 let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
181 info!(?key, "received transaction");
182 if tx.original.to_header()?.tag()?.as_ref().is_some() {
183 match dialog_layer.match_dialog(&tx.original) {
184 Some(mut d) => {
185 crate::spawn(async move {
186 match d.handle(&mut tx).await {
187 Ok(_) => (),
188 Err(e) => {
189 info!("error handling transaction: {:?}", e);
190 }
191 }
192 });
193 continue;
194 }
195 None => {
196 info!("dialog not found: {}", tx.original);
197 match tx
198 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
199 .await
200 {
201 Ok(_) => (),
202 Err(e) => {
203 info!("error replying to request: {:?}", e);
204 }
205 }
206 continue;
207 }
208 }
209 }
210 let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
212 match tx.original.method {
213 rsip::Method::Invite | rsip::Method::Ack => {
214 let invitation_handler = match self.create_invitation_handler {
215 Some(ref create_invitation_handler) => {
216 create_invitation_handler(self.config.handler.as_ref()).ok()
217 }
218 _ => default_create_invite_handler(self.config.handler.as_ref()),
219 };
220 let invitation_handler = match invitation_handler {
221 Some(h) => h,
222 None => {
223 info!(?key, "no invite handler configured, rejecting INVITE");
224 match tx
225 .reply_with(
226 rsip::StatusCode::ServiceUnavailable,
227 vec![rsip::Header::Other(
228 "Reason".into(),
229 "SIP;cause=503;text=\"No invite handler configured\""
230 .into(),
231 )],
232 None,
233 )
234 .await
235 {
236 Ok(_) => (),
237 Err(e) => {
238 info!("error replying to request: {:?}", e);
239 }
240 }
241 continue;
242 }
243 };
244 let contact = dialog_layer
245 .endpoint
246 .get_addrs()
247 .first()
248 .map(|addr| rsip::Uri {
249 scheme: Some(rsip::Scheme::Sip),
250 auth: None,
251 host_with_port: addr.addr.clone(),
252 params: vec![],
253 headers: vec![],
254 });
255
256 let dialog = match dialog_layer.get_or_create_server_invite(
257 &tx,
258 state_sender,
259 None,
260 contact,
261 ) {
262 Ok(d) => d,
263 Err(e) => {
264 info!("failed to obtain dialog: {:?}", e);
266 match tx
267 .reply(rsip::StatusCode::CallTransactionDoesNotExist)
268 .await
269 {
270 Ok(_) => (),
271 Err(e) => {
272 info!("error replying to request: {:?}", e);
273 }
274 }
275 continue;
276 }
277 };
278
279 let dialog_id_str = dialog.id().to_string();
280 let token = self.token.child_token();
281 let pending_dialog = PendingDialog {
282 token: token.clone(),
283 dialog: dialog.clone(),
284 state_receiver,
285 };
286
287 let guard = Arc::new(PendingDialogGuard::new(
288 self.invitation.clone(),
289 dialog_id_str.clone(),
290 pending_dialog,
291 ));
292
293 let accept_timeout = self
294 .config
295 .accept_timeout
296 .as_ref()
297 .and_then(|t| parse_duration(t).ok())
298 .unwrap_or_else(|| Duration::from_secs(60));
299
300 let token_ref = token.clone();
301 let guard_ref = guard.clone();
302 crate::spawn(async move {
303 select! {
304 _ = token_ref.cancelled() => {}
305 _ = tokio::time::sleep(accept_timeout) => {}
306 }
307 guard_ref.drop_async().await;
308 });
309
310 let mut dialog_ref = dialog.clone();
311 let token_ref = token.clone();
312 let routing_state = self.routing_state.clone();
313 crate::spawn(async move {
314 let invite_loop = async {
315 match invitation_handler
316 .on_invite(
317 dialog_id_str.clone(),
318 token,
319 dialog.clone(),
320 routing_state,
321 )
322 .await
323 {
324 Ok(_) => (),
325 Err(e) => {
326 info!(id = dialog_id_str, "error handling invite: {:?}", e);
327 guard.drop_async().await;
328 }
329 }
330 };
331 select! {
332 _ = token_ref.cancelled() => {}
333 _ = async {
334 let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
335 } => {}
336 }
337 });
338 }
339 rsip::Method::Options => {
340 info!(?key, "ignoring out-of-dialog OPTIONS request");
341 continue;
342 }
343 _ => {
344 info!(?key, "received request: {:?}", tx.original.method);
345 match tx.reply(rsip::StatusCode::OK).await {
346 Ok(_) => (),
347 Err(e) => {
348 info!("error replying to request: {:?}", e);
349 }
350 }
351 }
352 }
353 }
354 Ok(())
355 }
356
357 pub fn stop(&self) {
358 info!("stopping");
359 self.token.cancel();
360 }
361
362 pub async fn start_registration(&self) -> Result<usize> {
363 let mut count = 0;
364 if let Some(register_users) = &self.config.register_users {
365 for option in register_users.iter() {
366 match self.register(option.clone()).await {
367 Ok(_) => {
368 count += 1;
369 }
370 Err(e) => {
371 warn!("failed to register user: {:?} {:?}", e, option);
372 }
373 }
374 }
375 }
376 Ok(count)
377 }
378
379 pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
380 for (_, handle) in self.registration_handles.lock().await.iter_mut() {
381 handle.stop();
382 }
383
384 if let Some(duration) = wait_for_clear {
385 let live_users = self.alive_users.clone();
386 let check_loop = async move {
387 loop {
388 let is_empty = {
389 let users = live_users
390 .read()
391 .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
392 users.is_empty()
393 };
394 if is_empty {
395 break;
396 }
397 tokio::time::sleep(Duration::from_millis(50)).await;
398 }
399 Ok::<(), anyhow::Error>(())
400 };
401 match tokio::time::timeout(duration, check_loop).await {
402 Ok(_) => {}
403 Err(e) => {
404 warn!("failed to wait for clear: {}", e);
405 return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
406 }
407 }
408 }
409 Ok(())
410 }
411
412 pub async fn register(&self, option: RegisterOption) -> Result<()> {
413 let user = option.aor();
414 let mut server = option.server.clone();
415 if !server.starts_with("sip:") && !server.starts_with("sips:") {
416 server = format!("sip:{}", server);
417 }
418 let sip_server = match rsip::Uri::try_from(server) {
419 Ok(uri) => uri,
420 Err(e) => {
421 warn!("failed to parse server: {} {:?}", e, option.server);
422 return Err(anyhow::anyhow!("failed to parse server: {}", e));
423 }
424 };
425 let cancel_token = self.token.child_token();
426 let handle = RegistrationHandle {
427 inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
428 endpoint_inner: self.endpoint.inner.clone(),
429 option,
430 cancel_token,
431 start_time: Mutex::new(std::time::Instant::now()),
432 last_update: Mutex::new(std::time::Instant::now()),
433 last_response: Mutex::new(None),
434 }),
435 };
436 self.registration_handles
437 .lock()
438 .await
439 .insert(user.clone(), handle.clone());
440 let alive_users = self.alive_users.clone();
441
442 crate::spawn(async move {
443 *handle.inner.start_time.lock().await = std::time::Instant::now();
444
445 select! {
446 _ = handle.inner.cancel_token.cancelled() => {
447 }
448 _ = async {
449 loop {
450 let user = handle.inner.option.aor();
451 alive_users.write().unwrap().remove(&user);
452 let refresh_time = match handle.do_register(&sip_server, None).await {
453 Ok(expires) => {
454 info!(
455 user = handle.inner.option.aor(),
456 expires = expires,
457 alive_users = alive_users.read().unwrap().len(),
458 "registration refreshed",
459 );
460 alive_users.write().unwrap().insert(user);
461 expires * 3 / 4 }
463 Err(e) => {
464 warn!(
465 user = handle.inner.option.aor(),
466 alive_users = alive_users.read().unwrap().len(),
467 "registration failed: {:?}", e);
468 60
469 }
470 };
471 tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
472 }
473 } => {}
474 }
475 handle.do_register(&sip_server, Some(0)).await.ok();
476 alive_users.write().unwrap().remove(&user);
477 });
478 Ok(())
479 }
480}
481
482impl Drop for AppStateInner {
483 fn drop(&mut self) {
484 self.stop();
485 }
486}
487
488impl AppStateBuilder {
489 pub fn new() -> Self {
490 Self {
491 config: None,
492 stream_engine: None,
493 callrecord_sender: None,
494 callrecord_formatter: None,
495 cancel_token: None,
496 create_invitation_handler: None,
497 config_loaded_at: None,
498 config_path: None,
499 message_inspector: None,
500 target_locator: None,
501 transport_inspector: None,
502 }
503 }
504
505 pub fn with_config(mut self, config: Config) -> Self {
506 self.config = Some(config);
507 if self.config_loaded_at.is_none() {
508 self.config_loaded_at = Some(Utc::now());
509 }
510 self
511 }
512
513 pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
514 self.stream_engine = Some(stream_engine);
515 self
516 }
517
518 pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
519 self.callrecord_sender = Some(sender);
520 self
521 }
522
523 pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
524 self.cancel_token = Some(token);
525 self
526 }
527
528 pub fn with_config_metadata(mut self, path: Option<String>, loaded_at: DateTime<Utc>) -> Self {
529 self.config_path = path;
530 self.config_loaded_at = Some(loaded_at);
531 self
532 }
533
534 pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
535 self.message_inspector = Some(inspector);
536 self
537 }
538 pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
539 self.target_locator = Some(locator);
540 self
541 }
542
543 pub fn with_transport_inspector(
544 &mut self,
545 inspector: Box<dyn TransportEventInspector>,
546 ) -> &mut Self {
547 self.transport_inspector = Some(inspector);
548 self
549 }
550
551 pub async fn build(self) -> Result<AppState> {
552 let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
553 let token = self
554 .cancel_token
555 .unwrap_or_else(|| CancellationToken::new());
556 let _ = set_cache_dir(&config.media_cache_path);
557
558 let local_ip = if !config.addr.is_empty() {
559 std::net::IpAddr::from_str(config.addr.as_str())?
560 } else {
561 crate::net_tool::get_first_non_loopback_interface()?
562 };
563 let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
564 let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
565
566 let udp_conn = rsipstack::transport::udp::UdpConnection::create_connection(
567 local_addr,
568 None,
569 Some(token.child_token()),
570 )
571 .await
572 .map_err(|e| anyhow::anyhow!("Create useragent UDP connection: {} {}", local_addr, e))?;
573
574 transport_layer.add_transport(udp_conn.into());
575 info!("start useragent, addr: {}", local_addr);
576
577 let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
578 let mut endpoint_builder = rsipstack::EndpointBuilder::new();
579 if let Some(ref user_agent) = config.useragent {
580 endpoint_builder.with_user_agent(user_agent.as_str());
581 }
582
583 let mut endpoint_builder = endpoint_builder
584 .with_cancel_token(token.child_token())
585 .with_transport_layer(transport_layer)
586 .with_option(endpoint_option);
587
588 if let Some(inspector) = self.message_inspector {
589 endpoint_builder = endpoint_builder.with_inspector(inspector);
590 }
591
592 if let Some(locator) = self.target_locator {
593 endpoint_builder.with_target_locator(locator);
594 } else if let Some(ref rules) = config.rewrites {
595 endpoint_builder
596 .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
597 }
598
599 if let Some(inspector) = self.transport_inspector {
600 endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
601 }
602
603 let endpoint = endpoint_builder.build();
604 let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
605
606 let stream_engine = self.stream_engine.unwrap_or_default();
607
608 let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
609 formatter
610 } else {
611 let formatter = if let Some(ref callrecord) = config.callrecord {
612 DefaultCallRecordFormatter::new_with_config(callrecord)
613 } else {
614 DefaultCallRecordFormatter::default()
615 };
616 Arc::new(formatter)
617 };
618
619 let callrecord_sender = if let Some(sender) = self.callrecord_sender {
620 Some(sender)
621 } else if let Some(ref callrecord) = config.callrecord {
622 let builder = CallRecordManagerBuilder::new()
623 .with_cancel_token(token.child_token())
624 .with_config(callrecord.clone())
625 .with_max_concurrent(32)
626 .with_formatter(callrecord_formatter.clone());
627
628 let mut callrecord_manager = builder.build();
629 let sender = callrecord_manager.sender.clone();
630 crate::spawn(async move {
631 callrecord_manager.serve().await;
632 });
633 Some(sender)
634 } else {
635 None
636 };
637
638 let app_state = Arc::new(AppStateInner {
639 config,
640 token,
641 stream_engine,
642 callrecord_sender,
643 endpoint,
644 registration_handles: Mutex::new(HashMap::new()),
645 alive_users: Arc::new(RwLock::new(HashSet::new())),
646 dialog_layer: dialog_layer.clone(),
647 create_invitation_handler: self.create_invitation_handler,
648 invitation: Invitation::new(dialog_layer),
649 routing_state: Arc::new(crate::call::RoutingState::new()),
650 pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
651 active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
652 total_calls: AtomicU64::new(0),
653 total_failed_calls: AtomicU64::new(0),
654 });
655
656 Ok(app_state)
657 }
658}