active_call/
app.rs

1use crate::{
2    call::{ActiveCallRef, sip::Invitation},
3    callrecord::{
4        CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5    },
6    config::Config,
7    locator::RewriteTargetLocator,
8    useragent::{
9        RegisterOption,
10        invitation::FnCreateInvitationHandler,
11        invitation::{PendingDialog, PendingDialogGuard, default_create_invite_handler},
12        registration::RegistrationHandle,
13    },
14};
15
16use crate::media::{cache::set_cache_dir, engine::StreamEngine};
17use anyhow::Result;
18use chrono::{DateTime, Utc};
19use humantime::parse_duration;
20use rsip::prelude::HeadersExt;
21use rsipstack::transaction::{
22    Endpoint, TransactionReceiver,
23    endpoint::{TargetLocator, TransportEventInspector},
24};
25use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
26use std::collections::HashSet;
27use std::str::FromStr;
28use std::sync::{Arc, RwLock};
29use std::time::Duration;
30use std::{collections::HashMap, net::SocketAddr};
31use std::{path::Path, sync::atomic::AtomicU64};
32use tokio::select;
33use tokio::sync::Mutex;
34use tokio_util::sync::CancellationToken;
35use tracing::{info, warn};
36
37pub struct AppStateInner {
38    pub config: Arc<Config>,
39    pub token: CancellationToken,
40    pub stream_engine: Arc<StreamEngine>,
41    pub callrecord_sender: Option<CallRecordSender>,
42    pub endpoint: Endpoint,
43    pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
44    pub alive_users: Arc<RwLock<HashSet<String>>>,
45    pub dialog_layer: Arc<DialogLayer>,
46    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
47    pub invitation: Invitation,
48    pub routing_state: Arc<crate::call::RoutingState>,
49    pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
50
51    pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
52    pub total_calls: AtomicU64,
53    pub total_failed_calls: AtomicU64,
54}
55
56pub type AppState = Arc<AppStateInner>;
57
58pub struct AppStateBuilder {
59    pub config: Option<Config>,
60    pub stream_engine: Option<Arc<StreamEngine>>,
61    pub callrecord_sender: Option<CallRecordSender>,
62    pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
63    pub cancel_token: Option<CancellationToken>,
64    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
65    pub config_loaded_at: Option<DateTime<Utc>>,
66    pub config_path: Option<String>,
67
68    pub message_inspector: Option<Box<dyn MessageInspector>>,
69    pub target_locator: Option<Box<dyn TargetLocator>>,
70    pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
71}
72
73impl AppStateInner {
74    pub fn get_dump_events_file(&self, session_id: &String) -> String {
75        let recorder_root = self.config.recorder_path();
76        let root = Path::new(&recorder_root);
77        if !root.exists() {
78            match std::fs::create_dir_all(root) {
79                Ok(_) => {
80                    info!("created dump events root: {}", root.to_string_lossy());
81                }
82                Err(e) => {
83                    warn!(
84                        "Failed to create dump events root: {} {}",
85                        e,
86                        root.to_string_lossy()
87                    );
88                }
89            }
90        }
91        root.join(format!("{}.events.jsonl", session_id))
92            .to_string_lossy()
93            .to_string()
94    }
95
96    pub fn get_recorder_file(&self, session_id: &String) -> String {
97        let recorder_root = self.config.recorder_path();
98        let root = Path::new(&recorder_root);
99        if !root.exists() {
100            match std::fs::create_dir_all(root) {
101                Ok(_) => {
102                    info!("created recorder root: {}", root.to_string_lossy());
103                }
104                Err(e) => {
105                    warn!(
106                        "Failed to create recorder root: {} {}",
107                        e,
108                        root.to_string_lossy()
109                    );
110                }
111            }
112        }
113        let desired_ext = self.config.recorder_format().extension();
114        let mut filename = session_id.clone();
115        if !filename
116            .to_lowercase()
117            .ends_with(&format!(".{}", desired_ext.to_lowercase()))
118        {
119            filename = format!("{}.{}", filename, desired_ext);
120        }
121        root.join(filename).to_string_lossy().to_string()
122    }
123
124    pub async fn serve(self: Arc<Self>) -> Result<()> {
125        let incoming_txs = self.endpoint.incoming_transactions()?;
126        let token = self.token.child_token();
127        let endpoint_inner = self.endpoint.inner.clone();
128        let dialog_layer = self.dialog_layer.clone();
129
130        match self.start_registration().await {
131            Ok(count) => {
132                info!("registration started, count: {}", count);
133            }
134            Err(e) => {
135                warn!("failed to start registration: {:?}", e);
136            }
137        }
138
139        tokio::select! {
140            _ = token.cancelled() => {
141                info!("cancelled");
142            }
143            result = endpoint_inner.serve() => {
144                if let Err(e) = result {
145                    info!("endpoint serve error: {:?}", e);
146                }
147            }
148            result = self.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
149                if let Err(e) = result {
150                    info!("process incoming request error: {:?}", e);
151                }
152            },
153        }
154
155        // Wait for registration to stop, if not stopped within 50 seconds,
156        // force stop it.
157        let timeout = self
158            .config
159            .graceful_shutdown
160            .map(|_| Duration::from_secs(50));
161
162        match self.stop_registration(timeout).await {
163            Ok(_) => {
164                info!("registration stopped, waiting for clear");
165            }
166            Err(e) => {
167                warn!("failed to stop registration: {:?}", e);
168            }
169        }
170        info!("stopping");
171        Ok(())
172    }
173
174    async fn process_incoming_request(
175        &self,
176        dialog_layer: Arc<DialogLayer>,
177        mut incoming: TransactionReceiver,
178    ) -> Result<()> {
179        while let Some(mut tx) = incoming.recv().await {
180            let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
181            info!(?key, "received transaction");
182            if tx.original.to_header()?.tag()?.as_ref().is_some() {
183                match dialog_layer.match_dialog(&tx.original) {
184                    Some(mut d) => {
185                        crate::spawn(async move {
186                            match d.handle(&mut tx).await {
187                                Ok(_) => (),
188                                Err(e) => {
189                                    info!("error handling transaction: {:?}", e);
190                                }
191                            }
192                        });
193                        continue;
194                    }
195                    None => {
196                        info!("dialog not found: {}", tx.original);
197                        match tx
198                            .reply(rsip::StatusCode::CallTransactionDoesNotExist)
199                            .await
200                        {
201                            Ok(_) => (),
202                            Err(e) => {
203                                info!("error replying to request: {:?}", e);
204                            }
205                        }
206                        continue;
207                    }
208                }
209            }
210            // out dialog, new server dialog
211            let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
212            match tx.original.method {
213                rsip::Method::Invite | rsip::Method::Ack => {
214                    let invitation_handler = match self.create_invitation_handler {
215                        Some(ref create_invitation_handler) => {
216                            create_invitation_handler(self.config.handler.as_ref()).ok()
217                        }
218                        _ => default_create_invite_handler(self.config.handler.as_ref()),
219                    };
220                    let invitation_handler = match invitation_handler {
221                        Some(h) => h,
222                        None => {
223                            info!(?key, "no invite handler configured, rejecting INVITE");
224                            match tx
225                                .reply_with(
226                                    rsip::StatusCode::ServiceUnavailable,
227                                    vec![rsip::Header::Other(
228                                        "Reason".into(),
229                                        "SIP;cause=503;text=\"No invite handler configured\""
230                                            .into(),
231                                    )],
232                                    None,
233                                )
234                                .await
235                            {
236                                Ok(_) => (),
237                                Err(e) => {
238                                    info!("error replying to request: {:?}", e);
239                                }
240                            }
241                            continue;
242                        }
243                    };
244                    let contact = dialog_layer
245                        .endpoint
246                        .get_addrs()
247                        .first()
248                        .map(|addr| rsip::Uri {
249                            scheme: Some(rsip::Scheme::Sip),
250                            auth: None,
251                            host_with_port: addr.addr.clone(),
252                            params: vec![],
253                            headers: vec![],
254                        });
255
256                    let dialog = match dialog_layer.get_or_create_server_invite(
257                        &tx,
258                        state_sender,
259                        None,
260                        contact,
261                    ) {
262                        Ok(d) => d,
263                        Err(e) => {
264                            // 481 Dialog/Transaction Does Not Exist
265                            info!("failed to obtain dialog: {:?}", e);
266                            match tx
267                                .reply(rsip::StatusCode::CallTransactionDoesNotExist)
268                                .await
269                            {
270                                Ok(_) => (),
271                                Err(e) => {
272                                    info!("error replying to request: {:?}", e);
273                                }
274                            }
275                            continue;
276                        }
277                    };
278
279                    let dialog_id_str = dialog.id().to_string();
280                    let token = self.token.child_token();
281                    let pending_dialog = PendingDialog {
282                        token: token.clone(),
283                        dialog: dialog.clone(),
284                        state_receiver,
285                    };
286
287                    let guard = Arc::new(PendingDialogGuard::new(
288                        self.invitation.clone(),
289                        dialog_id_str.clone(),
290                        pending_dialog,
291                    ));
292
293                    let accept_timeout = self
294                        .config
295                        .accept_timeout
296                        .as_ref()
297                        .and_then(|t| parse_duration(t).ok())
298                        .unwrap_or_else(|| Duration::from_secs(60));
299
300                    let token_ref = token.clone();
301                    let guard_ref = guard.clone();
302                    crate::spawn(async move {
303                        select! {
304                            _ = token_ref.cancelled() => {}
305                            _ = tokio::time::sleep(accept_timeout) => {}
306                        }
307                        guard_ref.drop_async().await;
308                    });
309
310                    let mut dialog_ref = dialog.clone();
311                    let token_ref = token.clone();
312                    let routing_state = self.routing_state.clone();
313                    crate::spawn(async move {
314                        let invite_loop = async {
315                            match invitation_handler
316                                .on_invite(
317                                    dialog_id_str.clone(),
318                                    token,
319                                    dialog.clone(),
320                                    routing_state,
321                                )
322                                .await
323                            {
324                                Ok(_) => (),
325                                Err(e) => {
326                                    info!(id = dialog_id_str, "error handling invite: {:?}", e);
327                                    guard.drop_async().await;
328                                }
329                            }
330                        };
331                        select! {
332                            _ = token_ref.cancelled() => {}
333                            _ = async {
334                                let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
335                             } => {}
336                        }
337                    });
338                }
339                rsip::Method::Options => {
340                    info!(?key, "ignoring out-of-dialog OPTIONS request");
341                    continue;
342                }
343                _ => {
344                    info!(?key, "received request: {:?}", tx.original.method);
345                    match tx.reply(rsip::StatusCode::OK).await {
346                        Ok(_) => (),
347                        Err(e) => {
348                            info!("error replying to request: {:?}", e);
349                        }
350                    }
351                }
352            }
353        }
354        Ok(())
355    }
356
357    pub fn stop(&self) {
358        info!("stopping");
359        self.token.cancel();
360    }
361
362    pub async fn start_registration(&self) -> Result<usize> {
363        let mut count = 0;
364        if let Some(register_users) = &self.config.register_users {
365            for option in register_users.iter() {
366                match self.register(option.clone()).await {
367                    Ok(_) => {
368                        count += 1;
369                    }
370                    Err(e) => {
371                        warn!("failed to register user: {:?} {:?}", e, option);
372                    }
373                }
374            }
375        }
376        Ok(count)
377    }
378
379    pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
380        for (_, handle) in self.registration_handles.lock().await.iter_mut() {
381            handle.stop();
382        }
383
384        if let Some(duration) = wait_for_clear {
385            let live_users = self.alive_users.clone();
386            let check_loop = async move {
387                loop {
388                    let is_empty = {
389                        let users = live_users
390                            .read()
391                            .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
392                        users.is_empty()
393                    };
394                    if is_empty {
395                        break;
396                    }
397                    tokio::time::sleep(Duration::from_millis(50)).await;
398                }
399                Ok::<(), anyhow::Error>(())
400            };
401            match tokio::time::timeout(duration, check_loop).await {
402                Ok(_) => {}
403                Err(e) => {
404                    warn!("failed to wait for clear: {}", e);
405                    return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
406                }
407            }
408        }
409        Ok(())
410    }
411
412    pub async fn register(&self, option: RegisterOption) -> Result<()> {
413        let user = option.aor();
414        let mut server = option.server.clone();
415        if !server.starts_with("sip:") && !server.starts_with("sips:") {
416            server = format!("sip:{}", server);
417        }
418        let sip_server = match rsip::Uri::try_from(server) {
419            Ok(uri) => uri,
420            Err(e) => {
421                warn!("failed to parse server: {} {:?}", e, option.server);
422                return Err(anyhow::anyhow!("failed to parse server: {}", e));
423            }
424        };
425        let cancel_token = self.token.child_token();
426        let handle = RegistrationHandle {
427            inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
428                endpoint_inner: self.endpoint.inner.clone(),
429                option,
430                cancel_token,
431                start_time: Mutex::new(std::time::Instant::now()),
432                last_update: Mutex::new(std::time::Instant::now()),
433                last_response: Mutex::new(None),
434            }),
435        };
436        self.registration_handles
437            .lock()
438            .await
439            .insert(user.clone(), handle.clone());
440        let alive_users = self.alive_users.clone();
441
442        crate::spawn(async move {
443            *handle.inner.start_time.lock().await = std::time::Instant::now();
444
445            select! {
446                _ = handle.inner.cancel_token.cancelled() => {
447                }
448                _ = async {
449                    loop {
450                        let user = handle.inner.option.aor();
451                        alive_users.write().unwrap().remove(&user);
452                        let refresh_time = match handle.do_register(&sip_server, None).await {
453                            Ok(expires) => {
454                                info!(
455                                    user = handle.inner.option.aor(),
456                                    expires = expires,
457                                    alive_users = alive_users.read().unwrap().len(),
458                                    "registration refreshed",
459                                );
460                                alive_users.write().unwrap().insert(user);
461                                expires * 3 / 4 // 75% of expiration time
462                            }
463                            Err(e) => {
464                                warn!(
465                                    user = handle.inner.option.aor(),
466                                    alive_users = alive_users.read().unwrap().len(),
467                                    "registration failed: {:?}", e);
468                                60
469                            }
470                        };
471                        tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
472                    }
473                } => {}
474            }
475            handle.do_register(&sip_server, Some(0)).await.ok();
476            alive_users.write().unwrap().remove(&user);
477        });
478        Ok(())
479    }
480}
481
482impl Drop for AppStateInner {
483    fn drop(&mut self) {
484        self.stop();
485    }
486}
487
488impl AppStateBuilder {
489    pub fn new() -> Self {
490        Self {
491            config: None,
492            stream_engine: None,
493            callrecord_sender: None,
494            callrecord_formatter: None,
495            cancel_token: None,
496            create_invitation_handler: None,
497            config_loaded_at: None,
498            config_path: None,
499            message_inspector: None,
500            target_locator: None,
501            transport_inspector: None,
502        }
503    }
504
505    pub fn with_config(mut self, config: Config) -> Self {
506        self.config = Some(config);
507        if self.config_loaded_at.is_none() {
508            self.config_loaded_at = Some(Utc::now());
509        }
510        self
511    }
512
513    pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
514        self.stream_engine = Some(stream_engine);
515        self
516    }
517
518    pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
519        self.callrecord_sender = Some(sender);
520        self
521    }
522
523    pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
524        self.cancel_token = Some(token);
525        self
526    }
527
528    pub fn with_config_metadata(mut self, path: Option<String>, loaded_at: DateTime<Utc>) -> Self {
529        self.config_path = path;
530        self.config_loaded_at = Some(loaded_at);
531        self
532    }
533
534    pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
535        self.message_inspector = Some(inspector);
536        self
537    }
538    pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
539        self.target_locator = Some(locator);
540        self
541    }
542
543    pub fn with_transport_inspector(
544        &mut self,
545        inspector: Box<dyn TransportEventInspector>,
546    ) -> &mut Self {
547        self.transport_inspector = Some(inspector);
548        self
549    }
550
551    pub async fn build(self) -> Result<AppState> {
552        let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
553        let token = self
554            .cancel_token
555            .unwrap_or_else(|| CancellationToken::new());
556        let _ = set_cache_dir(&config.media_cache_path);
557
558        let local_ip = if !config.addr.is_empty() {
559            std::net::IpAddr::from_str(config.addr.as_str())?
560        } else {
561            crate::net_tool::get_first_non_loopback_interface()?
562        };
563        let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
564        let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
565
566        let udp_conn = rsipstack::transport::udp::UdpConnection::create_connection(
567            local_addr,
568            None,
569            Some(token.child_token()),
570        )
571        .await
572        .map_err(|e| anyhow::anyhow!("Create useragent UDP connection: {} {}", local_addr, e))?;
573
574        transport_layer.add_transport(udp_conn.into());
575        info!("start useragent, addr: {}", local_addr);
576
577        let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
578        let mut endpoint_builder = rsipstack::EndpointBuilder::new();
579        if let Some(ref user_agent) = config.useragent {
580            endpoint_builder.with_user_agent(user_agent.as_str());
581        }
582
583        let mut endpoint_builder = endpoint_builder
584            .with_cancel_token(token.child_token())
585            .with_transport_layer(transport_layer)
586            .with_option(endpoint_option);
587
588        if let Some(inspector) = self.message_inspector {
589            endpoint_builder = endpoint_builder.with_inspector(inspector);
590        }
591
592        if let Some(locator) = self.target_locator {
593            endpoint_builder.with_target_locator(locator);
594        } else if let Some(ref rules) = config.rewrites {
595            endpoint_builder
596                .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
597        }
598
599        if let Some(inspector) = self.transport_inspector {
600            endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
601        }
602
603        let endpoint = endpoint_builder.build();
604        let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
605
606        let stream_engine = self.stream_engine.unwrap_or_default();
607
608        let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
609            formatter
610        } else {
611            let formatter = if let Some(ref callrecord) = config.callrecord {
612                DefaultCallRecordFormatter::new_with_config(callrecord)
613            } else {
614                DefaultCallRecordFormatter::default()
615            };
616            Arc::new(formatter)
617        };
618
619        let callrecord_sender = if let Some(sender) = self.callrecord_sender {
620            Some(sender)
621        } else if let Some(ref callrecord) = config.callrecord {
622            let builder = CallRecordManagerBuilder::new()
623                .with_cancel_token(token.child_token())
624                .with_config(callrecord.clone())
625                .with_max_concurrent(32)
626                .with_formatter(callrecord_formatter.clone());
627
628            let mut callrecord_manager = builder.build();
629            let sender = callrecord_manager.sender.clone();
630            crate::spawn(async move {
631                callrecord_manager.serve().await;
632            });
633            Some(sender)
634        } else {
635            None
636        };
637
638        let app_state = Arc::new(AppStateInner {
639            config,
640            token,
641            stream_engine,
642            callrecord_sender,
643            endpoint,
644            registration_handles: Mutex::new(HashMap::new()),
645            alive_users: Arc::new(RwLock::new(HashSet::new())),
646            dialog_layer: dialog_layer.clone(),
647            create_invitation_handler: self.create_invitation_handler,
648            invitation: Invitation::new(dialog_layer),
649            routing_state: Arc::new(crate::call::RoutingState::new()),
650            pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
651            active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
652            total_calls: AtomicU64::new(0),
653            total_failed_calls: AtomicU64::new(0),
654        });
655
656        Ok(app_state)
657    }
658}