active_call/
app.rs

1use crate::{
2    call::{ActiveCallRef, sip::Invitation},
3    callrecord::{
4        CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5    },
6    config::Config,
7    locator::RewriteTargetLocator,
8    useragent::{
9        RegisterOption,
10        invitation::FnCreateInvitationHandler,
11        invitation::{PendingDialog, PendingDialogGuard, default_create_invite_handler},
12        registration::RegistrationHandle,
13    },
14};
15
16use crate::media::{cache::set_cache_dir, engine::StreamEngine};
17use anyhow::Result;
18use chrono::{DateTime, Utc};
19use humantime::parse_duration;
20use rsip::prelude::HeadersExt;
21use rsipstack::transaction::{
22    Endpoint, TransactionReceiver,
23    endpoint::{TargetLocator, TransportEventInspector},
24};
25use rsipstack::{dialog::dialog_layer::DialogLayer, transaction::endpoint::MessageInspector};
26use std::collections::HashSet;
27use std::str::FromStr;
28use std::sync::{Arc, RwLock};
29use std::time::Duration;
30use std::{collections::HashMap, net::SocketAddr};
31use std::{path::Path, sync::atomic::AtomicU64};
32use tokio::select;
33use tokio::sync::Mutex;
34use tokio_util::sync::CancellationToken;
35use tracing::{info, warn};
36
37pub struct AppStateInner {
38    pub config: Arc<Config>,
39    pub token: CancellationToken,
40    pub stream_engine: Arc<StreamEngine>,
41    pub callrecord_sender: Option<CallRecordSender>,
42    pub endpoint: Endpoint,
43    pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
44    pub alive_users: Arc<RwLock<HashSet<String>>>,
45    pub dialog_layer: Arc<DialogLayer>,
46    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
47    pub invitation: Invitation,
48    pub routing_state: Arc<crate::call::RoutingState>,
49    pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
50
51    pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
52    pub total_calls: AtomicU64,
53    pub total_failed_calls: AtomicU64,
54}
55
56pub type AppState = Arc<AppStateInner>;
57
58pub struct AppStateBuilder {
59    pub config: Option<Config>,
60    pub stream_engine: Option<Arc<StreamEngine>>,
61    pub callrecord_sender: Option<CallRecordSender>,
62    pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
63    pub cancel_token: Option<CancellationToken>,
64    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
65    pub config_loaded_at: Option<DateTime<Utc>>,
66    pub config_path: Option<String>,
67
68    pub message_inspector: Option<Box<dyn MessageInspector>>,
69    pub target_locator: Option<Box<dyn TargetLocator>>,
70    pub transport_inspector: Option<Box<dyn TransportEventInspector>>,
71}
72
73impl AppStateInner {
74    pub fn get_dump_events_file(&self, session_id: &String) -> String {
75        let recorder_root = self.config.recorder_path();
76        let root = Path::new(&recorder_root);
77        if !root.exists() {
78            match std::fs::create_dir_all(root) {
79                Ok(_) => {
80                    info!("created dump events root: {}", root.to_string_lossy());
81                }
82                Err(e) => {
83                    warn!(
84                        "Failed to create dump events root: {} {}",
85                        e,
86                        root.to_string_lossy()
87                    );
88                }
89            }
90        }
91        root.join(format!("{}.events.jsonl", session_id))
92            .to_string_lossy()
93            .to_string()
94    }
95
96    pub fn get_recorder_file(&self, session_id: &String) -> String {
97        let recorder_root = self.config.recorder_path();
98        let root = Path::new(&recorder_root);
99        if !root.exists() {
100            match std::fs::create_dir_all(root) {
101                Ok(_) => {
102                    info!("created recorder root: {}", root.to_string_lossy());
103                }
104                Err(e) => {
105                    warn!(
106                        "Failed to create recorder root: {} {}",
107                        e,
108                        root.to_string_lossy()
109                    );
110                }
111            }
112        }
113        let mut recorder_file = root.join(session_id);
114        let desired_ext = self.config.recorder_format().extension();
115        let has_desired_ext = recorder_file
116            .extension()
117            .and_then(|ext| ext.to_str())
118            .map(|ext| ext.eq_ignore_ascii_case(desired_ext))
119            .unwrap_or(false);
120
121        if !has_desired_ext {
122            // Ensure the on-disk filename matches the configured recorder format extension.
123            recorder_file.set_extension(desired_ext);
124        }
125
126        recorder_file.to_string_lossy().to_string()
127    }
128
129    pub async fn serve(self: Arc<Self>) -> Result<()> {
130        let incoming_txs = self.endpoint.incoming_transactions()?;
131        let token = self.token.child_token();
132        let endpoint_inner = self.endpoint.inner.clone();
133        let dialog_layer = self.dialog_layer.clone();
134
135        match self.start_registration().await {
136            Ok(count) => {
137                info!("registration started, count: {}", count);
138            }
139            Err(e) => {
140                warn!("failed to start registration: {:?}", e);
141            }
142        }
143
144        tokio::select! {
145            _ = token.cancelled() => {
146                info!("cancelled");
147            }
148            result = endpoint_inner.serve() => {
149                if let Err(e) = result {
150                    info!("endpoint serve error: {:?}", e);
151                }
152            }
153            result = self.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
154                if let Err(e) = result {
155                    info!("process incoming request error: {:?}", e);
156                }
157            },
158        }
159
160        // Wait for registration to stop, if not stopped within 50 seconds,
161        // force stop it.
162        let timeout = self
163            .config
164            .graceful_shutdown
165            .map(|_| Duration::from_secs(50));
166
167        match self.stop_registration(timeout).await {
168            Ok(_) => {
169                info!("registration stopped, waiting for clear");
170            }
171            Err(e) => {
172                warn!("failed to stop registration: {:?}", e);
173            }
174        }
175        info!("stopping");
176        Ok(())
177    }
178
179    async fn process_incoming_request(
180        &self,
181        dialog_layer: Arc<DialogLayer>,
182        mut incoming: TransactionReceiver,
183    ) -> Result<()> {
184        while let Some(mut tx) = incoming.recv().await {
185            let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
186            info!(?key, "received transaction");
187            if tx.original.to_header()?.tag()?.as_ref().is_some() {
188                match dialog_layer.match_dialog(&tx.original) {
189                    Some(mut d) => {
190                        tokio::spawn(async move {
191                            match d.handle(&mut tx).await {
192                                Ok(_) => (),
193                                Err(e) => {
194                                    info!("error handling transaction: {:?}", e);
195                                }
196                            }
197                        });
198                        continue;
199                    }
200                    None => {
201                        info!("dialog not found: {}", tx.original);
202                        match tx
203                            .reply(rsip::StatusCode::CallTransactionDoesNotExist)
204                            .await
205                        {
206                            Ok(_) => (),
207                            Err(e) => {
208                                info!("error replying to request: {:?}", e);
209                            }
210                        }
211                        continue;
212                    }
213                }
214            }
215            // out dialog, new server dialog
216            let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
217            match tx.original.method {
218                rsip::Method::Invite | rsip::Method::Ack => {
219                    let invitation_handler = match self.create_invitation_handler {
220                        Some(ref create_invitation_handler) => {
221                            create_invitation_handler(self.config.handler.as_ref()).ok()
222                        }
223                        _ => default_create_invite_handler(self.config.handler.as_ref()),
224                    };
225                    let invitation_handler = match invitation_handler {
226                        Some(h) => h,
227                        None => {
228                            info!(?key, "no invite handler configured, rejecting INVITE");
229                            match tx
230                                .reply_with(
231                                    rsip::StatusCode::ServiceUnavailable,
232                                    vec![rsip::Header::Other(
233                                        "Reason".into(),
234                                        "SIP;cause=503;text=\"No invite handler configured\""
235                                            .into(),
236                                    )],
237                                    None,
238                                )
239                                .await
240                            {
241                                Ok(_) => (),
242                                Err(e) => {
243                                    info!("error replying to request: {:?}", e);
244                                }
245                            }
246                            continue;
247                        }
248                    };
249                    let contact = dialog_layer
250                        .endpoint
251                        .get_addrs()
252                        .first()
253                        .map(|addr| rsip::Uri {
254                            scheme: Some(rsip::Scheme::Sip),
255                            auth: None,
256                            host_with_port: addr.addr.clone(),
257                            params: vec![],
258                            headers: vec![],
259                        });
260
261                    let dialog = match dialog_layer.get_or_create_server_invite(
262                        &tx,
263                        state_sender,
264                        None,
265                        contact,
266                    ) {
267                        Ok(d) => d,
268                        Err(e) => {
269                            // 481 Dialog/Transaction Does Not Exist
270                            info!("failed to obtain dialog: {:?}", e);
271                            match tx
272                                .reply(rsip::StatusCode::CallTransactionDoesNotExist)
273                                .await
274                            {
275                                Ok(_) => (),
276                                Err(e) => {
277                                    info!("error replying to request: {:?}", e);
278                                }
279                            }
280                            continue;
281                        }
282                    };
283
284                    let dialog_id_str = dialog.id().to_string();
285                    let token = self.token.child_token();
286                    let pending_dialog = PendingDialog {
287                        token: token.clone(),
288                        dialog: dialog.clone(),
289                        state_receiver,
290                    };
291
292                    let guard = Arc::new(PendingDialogGuard::new(
293                        self.invitation.clone(),
294                        dialog_id_str.clone(),
295                        pending_dialog,
296                    ));
297
298                    let accept_timeout = self
299                        .config
300                        .accept_timeout
301                        .as_ref()
302                        .and_then(|t| parse_duration(t).ok())
303                        .unwrap_or_else(|| Duration::from_secs(60));
304
305                    let token_ref = token.clone();
306                    let guard_ref = guard.clone();
307                    tokio::spawn(async move {
308                        select! {
309                            _ = token_ref.cancelled() => {}
310                            _ = tokio::time::sleep(accept_timeout) => {}
311                        }
312                        guard_ref.drop_async().await;
313                    });
314
315                    let mut dialog_ref = dialog.clone();
316                    let token_ref = token.clone();
317                    let routing_state = self.routing_state.clone();
318                    tokio::spawn(async move {
319                        let invite_loop = async {
320                            match invitation_handler
321                                .on_invite(
322                                    dialog_id_str.clone(),
323                                    token,
324                                    dialog.clone(),
325                                    routing_state,
326                                )
327                                .await
328                            {
329                                Ok(_) => (),
330                                Err(e) => {
331                                    info!(id = dialog_id_str, "error handling invite: {:?}", e);
332                                    guard.drop_async().await;
333                                }
334                            }
335                        };
336                        select! {
337                            _ = token_ref.cancelled() => {}
338                            _ = async {
339                                let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
340                             } => {}
341                        }
342                    });
343                }
344                rsip::Method::Options => {
345                    info!(?key, "ignoring out-of-dialog OPTIONS request");
346                    continue;
347                }
348                _ => {
349                    info!(?key, "received request: {:?}", tx.original.method);
350                    match tx.reply(rsip::StatusCode::OK).await {
351                        Ok(_) => (),
352                        Err(e) => {
353                            info!("error replying to request: {:?}", e);
354                        }
355                    }
356                }
357            }
358        }
359        Ok(())
360    }
361
362    pub fn stop(&self) {
363        info!("stopping");
364        self.token.cancel();
365    }
366
367    pub async fn start_registration(&self) -> Result<usize> {
368        let mut count = 0;
369        if let Some(register_users) = &self.config.register_users {
370            for option in register_users.iter() {
371                match self.register(option.clone()).await {
372                    Ok(_) => {
373                        count += 1;
374                    }
375                    Err(e) => {
376                        warn!("failed to register user: {:?} {:?}", e, option);
377                    }
378                }
379            }
380        }
381        Ok(count)
382    }
383
384    pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
385        for (_, handle) in self.registration_handles.lock().await.iter_mut() {
386            handle.stop();
387        }
388
389        if let Some(duration) = wait_for_clear {
390            let live_users = self.alive_users.clone();
391            let check_loop = async move {
392                loop {
393                    let is_empty = {
394                        let users = live_users
395                            .read()
396                            .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
397                        users.is_empty()
398                    };
399                    if is_empty {
400                        break;
401                    }
402                    tokio::time::sleep(Duration::from_millis(50)).await;
403                }
404                Ok::<(), anyhow::Error>(())
405            };
406            match tokio::time::timeout(duration, check_loop).await {
407                Ok(_) => {}
408                Err(e) => {
409                    warn!("failed to wait for clear: {}", e);
410                    return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
411                }
412            }
413        }
414        Ok(())
415    }
416
417    pub async fn register(&self, option: RegisterOption) -> Result<()> {
418        let user = option.aor();
419        let mut server = option.server.clone();
420        if !server.starts_with("sip:") && !server.starts_with("sips:") {
421            server = format!("sip:{}", server);
422        }
423        let sip_server = match rsip::Uri::try_from(server) {
424            Ok(uri) => uri,
425            Err(e) => {
426                warn!("failed to parse server: {} {:?}", e, option.server);
427                return Err(anyhow::anyhow!("failed to parse server: {}", e));
428            }
429        };
430        let cancel_token = self.token.child_token();
431        let handle = RegistrationHandle {
432            inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
433                endpoint_inner: self.endpoint.inner.clone(),
434                option,
435                cancel_token,
436                start_time: Mutex::new(std::time::Instant::now()),
437                last_update: Mutex::new(std::time::Instant::now()),
438                last_response: Mutex::new(None),
439            }),
440        };
441        self.registration_handles
442            .lock()
443            .await
444            .insert(user.clone(), handle.clone());
445        let alive_users = self.alive_users.clone();
446
447        tokio::spawn(async move {
448            *handle.inner.start_time.lock().await = std::time::Instant::now();
449
450            select! {
451                _ = handle.inner.cancel_token.cancelled() => {
452                }
453                _ = async {
454                    loop {
455                        let user = handle.inner.option.aor();
456                        alive_users.write().unwrap().remove(&user);
457                        let refresh_time = match handle.do_register(&sip_server, None).await {
458                            Ok(expires) => {
459                                info!(
460                                    user = handle.inner.option.aor(),
461                                    expires = expires,
462                                    alive_users = alive_users.read().unwrap().len(),
463                                    "registration refreshed",
464                                );
465                                alive_users.write().unwrap().insert(user);
466                                expires * 3 / 4 // 75% of expiration time
467                            }
468                            Err(e) => {
469                                warn!(
470                                    user = handle.inner.option.aor(),
471                                    alive_users = alive_users.read().unwrap().len(),
472                                    "registration failed: {:?}", e);
473                                60
474                            }
475                        };
476                        tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
477                    }
478                } => {}
479            }
480            handle.do_register(&sip_server, Some(0)).await.ok();
481            alive_users.write().unwrap().remove(&user);
482        });
483        Ok(())
484    }
485}
486
487impl Drop for AppStateInner {
488    fn drop(&mut self) {
489        self.stop();
490    }
491}
492
493impl AppStateBuilder {
494    pub fn new() -> Self {
495        Self {
496            config: None,
497            stream_engine: None,
498            callrecord_sender: None,
499            callrecord_formatter: None,
500            cancel_token: None,
501            create_invitation_handler: None,
502            config_loaded_at: None,
503            config_path: None,
504            message_inspector: None,
505            target_locator: None,
506            transport_inspector: None,
507        }
508    }
509
510    pub fn with_config(mut self, mut config: Config) -> Self {
511        if config.ensure_recording_defaults() {
512            warn!(
513                "recorder_format=ogg requires compiling with the 'opus' feature; falling back to wav"
514            );
515        }
516        self.config = Some(config);
517        if self.config_loaded_at.is_none() {
518            self.config_loaded_at = Some(Utc::now());
519        }
520        self
521    }
522
523    pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
524        self.stream_engine = Some(stream_engine);
525        self
526    }
527
528    pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
529        self.callrecord_sender = Some(sender);
530        self
531    }
532
533    pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
534        self.cancel_token = Some(token);
535        self
536    }
537
538    pub fn with_config_metadata(mut self, path: Option<String>, loaded_at: DateTime<Utc>) -> Self {
539        self.config_path = path;
540        self.config_loaded_at = Some(loaded_at);
541        self
542    }
543
544    pub fn with_inspector(&mut self, inspector: Box<dyn MessageInspector>) -> &mut Self {
545        self.message_inspector = Some(inspector);
546        self
547    }
548    pub fn with_target_locator(&mut self, locator: Box<dyn TargetLocator>) -> &mut Self {
549        self.target_locator = Some(locator);
550        self
551    }
552
553    pub fn with_transport_inspector(
554        &mut self,
555        inspector: Box<dyn TransportEventInspector>,
556    ) -> &mut Self {
557        self.transport_inspector = Some(inspector);
558        self
559    }
560
561    pub async fn build(self) -> Result<AppState> {
562        let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
563        let token = self
564            .cancel_token
565            .unwrap_or_else(|| CancellationToken::new());
566        let _ = set_cache_dir(&config.media_cache_path);
567
568        let local_ip = if !config.addr.is_empty() {
569            std::net::IpAddr::from_str(config.addr.as_str())?
570        } else {
571            crate::net_tool::get_first_non_loopback_interface()?
572        };
573        let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
574        let local_addr: SocketAddr = format!("{}:{}", local_ip, config.udp_port).parse()?;
575
576        let udp_conn = rsipstack::transport::udp::UdpConnection::create_connection(
577            local_addr,
578            None,
579            Some(token.child_token()),
580        )
581        .await
582        .map_err(|e| anyhow::anyhow!("Create useragent UDP connection: {} {}", local_addr, e))?;
583
584        transport_layer.add_transport(udp_conn.into());
585        info!("start useragent, addr: {}", local_addr);
586
587        let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
588        let mut endpoint_builder = rsipstack::EndpointBuilder::new();
589        if let Some(ref user_agent) = config.useragent {
590            endpoint_builder.with_user_agent(user_agent.as_str());
591        }
592
593        let mut endpoint_builder = endpoint_builder
594            .with_cancel_token(token.child_token())
595            .with_transport_layer(transport_layer)
596            .with_option(endpoint_option);
597
598        if let Some(inspector) = self.message_inspector {
599            endpoint_builder = endpoint_builder.with_inspector(inspector);
600        }
601
602        if let Some(locator) = self.target_locator {
603            endpoint_builder.with_target_locator(locator);
604        } else if let Some(ref rules) = config.rewrites {
605            endpoint_builder
606                .with_target_locator(Box::new(RewriteTargetLocator::new(rules.clone())));
607        }
608
609        if let Some(inspector) = self.transport_inspector {
610            endpoint_builder = endpoint_builder.with_transport_inspector(inspector);
611        }
612
613        let endpoint = endpoint_builder.build();
614        let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
615
616        let stream_engine = self.stream_engine.unwrap_or_default();
617
618        let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
619            formatter
620        } else {
621            let formatter = if let Some(ref callrecord) = config.callrecord {
622                DefaultCallRecordFormatter::new_with_config(callrecord)
623            } else {
624                DefaultCallRecordFormatter::default()
625            };
626            Arc::new(formatter)
627        };
628
629        let callrecord_sender = if let Some(sender) = self.callrecord_sender {
630            Some(sender)
631        } else if let Some(ref callrecord) = config.callrecord {
632            let builder = CallRecordManagerBuilder::new()
633                .with_cancel_token(token.child_token())
634                .with_config(callrecord.clone())
635                .with_max_concurrent(32)
636                .with_formatter(callrecord_formatter.clone());
637
638            let mut callrecord_manager = builder.build();
639            let sender = callrecord_manager.sender.clone();
640            tokio::spawn(async move {
641                callrecord_manager.serve().await;
642            });
643            Some(sender)
644        } else {
645            None
646        };
647
648        let app_state = Arc::new(AppStateInner {
649            config,
650            token,
651            stream_engine,
652            callrecord_sender,
653            endpoint,
654            registration_handles: Mutex::new(HashMap::new()),
655            alive_users: Arc::new(RwLock::new(HashSet::new())),
656            dialog_layer: dialog_layer.clone(),
657            create_invitation_handler: self.create_invitation_handler,
658            invitation: Invitation::new(dialog_layer),
659            routing_state: Arc::new(crate::call::RoutingState::new()),
660            pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
661            active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
662            total_calls: AtomicU64::new(0),
663            total_failed_calls: AtomicU64::new(0),
664        });
665
666        Ok(app_state)
667    }
668}