active_call/
app.rs

1use crate::{
2    call::{ActiveCallRef, sip::Invitation},
3    callrecord::{
4        CallRecordFormatter, CallRecordManagerBuilder, CallRecordSender, DefaultCallRecordFormatter,
5    },
6    config::Config,
7    useragent::{
8        RegisterOption,
9        invitation::FnCreateInvitationHandler,
10        invitation::{PendingDialog, PendingDialogGuard, default_create_invite_handler},
11        registration::RegistrationHandle,
12    },
13};
14
15use anyhow::Result;
16use chrono::{DateTime, Utc};
17use humantime::parse_duration;
18use rsip::prelude::HeadersExt;
19use rsipstack::dialog::dialog_layer::DialogLayer;
20use rsipstack::transaction::{Endpoint, TransactionReceiver};
21use std::collections::HashSet;
22use std::str::FromStr;
23use std::sync::{Arc, RwLock};
24use std::time::Duration;
25use std::{collections::HashMap, net::SocketAddr};
26use std::{path::Path, sync::atomic::AtomicU64};
27use tokio::select;
28use tokio::sync::Mutex;
29use tokio_util::sync::CancellationToken;
30use tracing::{info, warn};
31use voice_engine::media::{cache::set_cache_dir, engine::StreamEngine};
32
33pub struct AppStateInner {
34    pub config: Arc<Config>,
35    pub token: CancellationToken,
36    pub stream_engine: Arc<StreamEngine>,
37    pub callrecord_sender: Option<CallRecordSender>,
38    pub endpoint: Endpoint,
39    pub registration_handles: Mutex<HashMap<String, RegistrationHandle>>,
40    pub alive_users: Arc<RwLock<HashSet<String>>>,
41    pub dialog_layer: Arc<DialogLayer>,
42    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
43    pub invitation: Invitation,
44    pub routing_state: Arc<crate::call::RoutingState>,
45    pub pending_playbooks: Arc<Mutex<HashMap<String, String>>>,
46
47    pub active_calls: Arc<std::sync::Mutex<HashMap<String, ActiveCallRef>>>,
48    pub total_calls: AtomicU64,
49    pub total_failed_calls: AtomicU64,
50}
51
52pub type AppState = Arc<AppStateInner>;
53
54pub struct AppStateBuilder {
55    pub config: Option<Config>,
56    pub stream_engine: Option<Arc<StreamEngine>>,
57    pub callrecord_sender: Option<CallRecordSender>,
58    pub callrecord_formatter: Option<Arc<dyn CallRecordFormatter>>,
59    pub cancel_token: Option<CancellationToken>,
60    pub create_invitation_handler: Option<FnCreateInvitationHandler>,
61    pub config_loaded_at: Option<DateTime<Utc>>,
62    pub config_path: Option<String>,
63}
64
65impl AppStateInner {
66    pub fn get_dump_events_file(&self, session_id: &String) -> String {
67        let recorder_root = self.config.recorder_path();
68        let root = Path::new(&recorder_root);
69        if !root.exists() {
70            match std::fs::create_dir_all(root) {
71                Ok(_) => {
72                    info!("created dump events root: {}", root.to_string_lossy());
73                }
74                Err(e) => {
75                    warn!(
76                        "Failed to create dump events root: {} {}",
77                        e,
78                        root.to_string_lossy()
79                    );
80                }
81            }
82        }
83        root.join(format!("{}.events.jsonl", session_id))
84            .to_string_lossy()
85            .to_string()
86    }
87
88    pub fn get_recorder_file(&self, session_id: &String) -> String {
89        let recorder_root = self.config.recorder_path();
90        let root = Path::new(&recorder_root);
91        if !root.exists() {
92            match std::fs::create_dir_all(root) {
93                Ok(_) => {
94                    info!("created recorder root: {}", root.to_string_lossy());
95                }
96                Err(e) => {
97                    warn!(
98                        "Failed to create recorder root: {} {}",
99                        e,
100                        root.to_string_lossy()
101                    );
102                }
103            }
104        }
105        let mut recorder_file = root.join(session_id);
106        let desired_ext = self.config.recorder_format().extension();
107        let has_desired_ext = recorder_file
108            .extension()
109            .and_then(|ext| ext.to_str())
110            .map(|ext| ext.eq_ignore_ascii_case(desired_ext))
111            .unwrap_or(false);
112
113        if !has_desired_ext {
114            // Ensure the on-disk filename matches the configured recorder format extension.
115            recorder_file.set_extension(desired_ext);
116        }
117
118        recorder_file.to_string_lossy().to_string()
119    }
120
121    pub async fn serve(self: Arc<Self>) -> Result<()> {
122        let incoming_txs = self.endpoint.incoming_transactions()?;
123        let token = self.token.child_token();
124        let endpoint_inner = self.endpoint.inner.clone();
125        let dialog_layer = self.dialog_layer.clone();
126
127        match self.start_registration().await {
128            Ok(count) => {
129                info!("registration started, count: {}", count);
130            }
131            Err(e) => {
132                warn!("failed to start registration: {:?}", e);
133            }
134        }
135
136        tokio::select! {
137            _ = token.cancelled() => {
138                info!("cancelled");
139            }
140            result = endpoint_inner.serve() => {
141                if let Err(e) = result {
142                    info!("endpoint serve error: {:?}", e);
143                }
144            }
145            result = self.process_incoming_request(dialog_layer.clone(), incoming_txs) => {
146                if let Err(e) = result {
147                    info!("process incoming request error: {:?}", e);
148                }
149            },
150        }
151
152        // Wait for registration to stop, if not stopped within 50 seconds,
153        // force stop it.
154        let timeout = self
155            .config
156            .graceful_shutdown
157            .map(|_| Duration::from_secs(50));
158
159        match self.stop_registration(timeout).await {
160            Ok(_) => {
161                info!("registration stopped, waiting for clear");
162            }
163            Err(e) => {
164                warn!("failed to stop registration: {:?}", e);
165            }
166        }
167        info!("stopping");
168        Ok(())
169    }
170
171    async fn process_incoming_request(
172        &self,
173        dialog_layer: Arc<DialogLayer>,
174        mut incoming: TransactionReceiver,
175    ) -> Result<()> {
176        while let Some(mut tx) = incoming.recv().await {
177            let key: &rsipstack::transaction::key::TransactionKey = &tx.key;
178            info!(?key, "received transaction");
179            if tx.original.to_header()?.tag()?.as_ref().is_some() {
180                match dialog_layer.match_dialog(&tx.original) {
181                    Some(mut d) => {
182                        tokio::spawn(async move {
183                            match d.handle(&mut tx).await {
184                                Ok(_) => (),
185                                Err(e) => {
186                                    info!("error handling transaction: {:?}", e);
187                                }
188                            }
189                        });
190                        continue;
191                    }
192                    None => {
193                        info!("dialog not found: {}", tx.original);
194                        match tx
195                            .reply(rsip::StatusCode::CallTransactionDoesNotExist)
196                            .await
197                        {
198                            Ok(_) => (),
199                            Err(e) => {
200                                info!("error replying to request: {:?}", e);
201                            }
202                        }
203                        continue;
204                    }
205                }
206            }
207            // out dialog, new server dialog
208            let (state_sender, state_receiver) = dialog_layer.new_dialog_state_channel();
209            match tx.original.method {
210                rsip::Method::Invite | rsip::Method::Ack => {
211                    let invitation_handler = match self.create_invitation_handler {
212                        Some(ref create_invitation_handler) => {
213                            create_invitation_handler(self.config.sip_handler.as_ref()).ok()
214                        }
215                        _ => default_create_invite_handler(self.config.sip_handler.as_ref()),
216                    };
217                    let invitation_handler = match invitation_handler {
218                        Some(h) => h,
219                        None => {
220                            info!(?key, "no invite handler configured, rejecting INVITE");
221                            match tx
222                                .reply_with(
223                                    rsip::StatusCode::ServiceUnavailable,
224                                    vec![rsip::Header::Other(
225                                        "Reason".into(),
226                                        "SIP;cause=503;text=\"No invite handler configured\""
227                                            .into(),
228                                    )],
229                                    None,
230                                )
231                                .await
232                            {
233                                Ok(_) => (),
234                                Err(e) => {
235                                    info!("error replying to request: {:?}", e);
236                                }
237                            }
238                            continue;
239                        }
240                    };
241                    let contact = dialog_layer
242                        .endpoint
243                        .get_addrs()
244                        .first()
245                        .map(|addr| rsip::Uri {
246                            scheme: Some(rsip::Scheme::Sip),
247                            auth: None,
248                            host_with_port: addr.addr.clone(),
249                            params: vec![],
250                            headers: vec![],
251                        });
252
253                    let dialog = match dialog_layer.get_or_create_server_invite(
254                        &tx,
255                        state_sender,
256                        None,
257                        contact,
258                    ) {
259                        Ok(d) => d,
260                        Err(e) => {
261                            // 481 Dialog/Transaction Does Not Exist
262                            info!("failed to obtain dialog: {:?}", e);
263                            match tx
264                                .reply(rsip::StatusCode::CallTransactionDoesNotExist)
265                                .await
266                            {
267                                Ok(_) => (),
268                                Err(e) => {
269                                    info!("error replying to request: {:?}", e);
270                                }
271                            }
272                            continue;
273                        }
274                    };
275
276                    let dialog_id_str = dialog.id().to_string();
277                    let token = self.token.child_token();
278                    let pending_dialog = PendingDialog {
279                        token: token.clone(),
280                        dialog: dialog.clone(),
281                        state_receiver,
282                    };
283
284                    let guard = Arc::new(PendingDialogGuard::new(
285                        self.invitation.clone(),
286                        dialog_id_str.clone(),
287                        pending_dialog,
288                    ));
289
290                    let accept_timeout = self
291                        .config
292                        .sip_accept_timeout
293                        .as_ref()
294                        .and_then(|t| parse_duration(t).ok())
295                        .unwrap_or_else(|| Duration::from_secs(60));
296
297                    let token_ref = token.clone();
298                    let guard_ref = guard.clone();
299                    tokio::spawn(async move {
300                        select! {
301                            _ = token_ref.cancelled() => {}
302                            _ = tokio::time::sleep(accept_timeout) => {}
303                        }
304                        guard_ref.drop_async().await;
305                    });
306
307                    let mut dialog_ref = dialog.clone();
308                    let token_ref = token.clone();
309                    let routing_state = self.routing_state.clone();
310                    tokio::spawn(async move {
311                        let invite_loop = async {
312                            match invitation_handler
313                                .on_invite(
314                                    dialog_id_str.clone(),
315                                    token,
316                                    dialog.clone(),
317                                    routing_state,
318                                )
319                                .await
320                            {
321                                Ok(_) => (),
322                                Err(e) => {
323                                    info!(id = dialog_id_str, "error handling invite: {:?}", e);
324                                    guard.drop_async().await;
325                                }
326                            }
327                        };
328                        select! {
329                            _ = token_ref.cancelled() => {}
330                            _ = async {
331                                let (_,_ ) = tokio::join!(dialog_ref.handle(&mut tx), invite_loop);
332                             } => {}
333                        }
334                    });
335                }
336                rsip::Method::Options => {
337                    info!(?key, "ignoring out-of-dialog OPTIONS request");
338                    continue;
339                }
340                _ => {
341                    info!(?key, "received request: {:?}", tx.original.method);
342                    match tx.reply(rsip::StatusCode::OK).await {
343                        Ok(_) => (),
344                        Err(e) => {
345                            info!("error replying to request: {:?}", e);
346                        }
347                    }
348                }
349            }
350        }
351        Ok(())
352    }
353
354    pub fn stop(&self) {
355        info!("stopping");
356        self.token.cancel();
357    }
358
359    pub async fn start_registration(&self) -> Result<usize> {
360        let mut count = 0;
361        if let Some(register_users) = &self.config.register_users {
362            for option in register_users.iter() {
363                match self.register(option.clone()).await {
364                    Ok(_) => {
365                        count += 1;
366                    }
367                    Err(e) => {
368                        warn!("failed to register user: {:?} {:?}", e, option);
369                    }
370                }
371            }
372        }
373        Ok(count)
374    }
375
376    pub async fn stop_registration(&self, wait_for_clear: Option<Duration>) -> Result<()> {
377        for (_, handle) in self.registration_handles.lock().await.iter_mut() {
378            handle.stop();
379        }
380
381        if let Some(duration) = wait_for_clear {
382            let live_users = self.alive_users.clone();
383            let check_loop = async move {
384                loop {
385                    let is_empty = {
386                        let users = live_users
387                            .read()
388                            .map_err(|_| anyhow::anyhow!("Lock poisoned"))?;
389                        users.is_empty()
390                    };
391                    if is_empty {
392                        break;
393                    }
394                    tokio::time::sleep(Duration::from_millis(50)).await;
395                }
396                Ok::<(), anyhow::Error>(())
397            };
398            match tokio::time::timeout(duration, check_loop).await {
399                Ok(_) => {}
400                Err(e) => {
401                    warn!("failed to wait for clear: {}", e);
402                    return Err(anyhow::anyhow!("failed to wait for clear: {}", e));
403                }
404            }
405        }
406        Ok(())
407    }
408
409    pub async fn register(&self, option: RegisterOption) -> Result<()> {
410        let user = option.aor();
411        let mut server = option.server.clone();
412        if !server.starts_with("sip:") && !server.starts_with("sips:") {
413            server = format!("sip:{}", server);
414        }
415        let sip_server = match rsip::Uri::try_from(server) {
416            Ok(uri) => uri,
417            Err(e) => {
418                warn!("failed to parse server: {} {:?}", e, option.server);
419                return Err(anyhow::anyhow!("failed to parse server: {}", e));
420            }
421        };
422        let cancel_token = self.token.child_token();
423        let handle = RegistrationHandle {
424            inner: Arc::new(crate::useragent::registration::RegistrationHandleInner {
425                endpoint_inner: self.endpoint.inner.clone(),
426                option,
427                cancel_token,
428                start_time: Mutex::new(std::time::Instant::now()),
429                last_update: Mutex::new(std::time::Instant::now()),
430                last_response: Mutex::new(None),
431            }),
432        };
433        self.registration_handles
434            .lock()
435            .await
436            .insert(user.clone(), handle.clone());
437        let alive_users = self.alive_users.clone();
438
439        tokio::spawn(async move {
440            *handle.inner.start_time.lock().await = std::time::Instant::now();
441
442            select! {
443                _ = handle.inner.cancel_token.cancelled() => {
444                }
445                _ = async {
446                    loop {
447                        let user = handle.inner.option.aor();
448                        alive_users.write().unwrap().remove(&user);
449                        let refresh_time = match handle.do_register(&sip_server, None).await {
450                            Ok(expires) => {
451                                info!(
452                                    user = handle.inner.option.aor(),
453                                    expires = expires,
454                                    alive_users = alive_users.read().unwrap().len(),
455                                    "registration refreshed",
456                                );
457                                alive_users.write().unwrap().insert(user);
458                                expires * 3 / 4 // 75% of expiration time
459                            }
460                            Err(e) => {
461                                warn!(
462                                    user = handle.inner.option.aor(),
463                                    alive_users = alive_users.read().unwrap().len(),
464                                    "registration failed: {:?}", e);
465                                60
466                            }
467                        };
468                        tokio::time::sleep(Duration::from_secs(refresh_time as u64)).await;
469                    }
470                } => {}
471            }
472            handle.do_register(&sip_server, Some(0)).await.ok();
473            alive_users.write().unwrap().remove(&user);
474        });
475        Ok(())
476    }
477}
478
479impl Drop for AppStateInner {
480    fn drop(&mut self) {
481        self.stop();
482    }
483}
484
485impl AppStateBuilder {
486    pub fn new() -> Self {
487        Self {
488            config: None,
489            stream_engine: None,
490            callrecord_sender: None,
491            callrecord_formatter: None,
492            cancel_token: None,
493            create_invitation_handler: None,
494            config_loaded_at: None,
495            config_path: None,
496        }
497    }
498
499    pub fn with_config(mut self, mut config: Config) -> Self {
500        if config.ensure_recording_defaults() {
501            warn!(
502                "recorder_format=ogg requires compiling with the 'opus' feature; falling back to wav"
503            );
504        }
505        self.config = Some(config);
506        if self.config_loaded_at.is_none() {
507            self.config_loaded_at = Some(Utc::now());
508        }
509        self
510    }
511
512    pub fn with_stream_engine(mut self, stream_engine: Arc<StreamEngine>) -> Self {
513        self.stream_engine = Some(stream_engine);
514        self
515    }
516
517    pub fn with_callrecord_sender(mut self, sender: CallRecordSender) -> Self {
518        self.callrecord_sender = Some(sender);
519        self
520    }
521
522    pub fn with_cancel_token(mut self, token: CancellationToken) -> Self {
523        self.cancel_token = Some(token);
524        self
525    }
526
527    pub fn with_config_metadata(mut self, path: Option<String>, loaded_at: DateTime<Utc>) -> Self {
528        self.config_path = path;
529        self.config_loaded_at = Some(loaded_at);
530        self
531    }
532
533    pub async fn build(self) -> Result<AppState> {
534        let config: Arc<Config> = Arc::new(self.config.unwrap_or_default());
535        let token = self
536            .cancel_token
537            .unwrap_or_else(|| CancellationToken::new());
538        let _ = set_cache_dir(&config.media_cache_path);
539
540        let local_ip = if !config.sip_addr.is_empty() {
541            std::net::IpAddr::from_str(config.sip_addr.as_str())?
542        } else {
543            voice_engine::net_tool::get_first_non_loopback_interface()?
544        };
545        let transport_layer = rsipstack::transport::TransportLayer::new(token.clone());
546        let local_addr: SocketAddr = format!("{}:{}", local_ip, config.sip_port).parse()?;
547
548        let udp_conn = rsipstack::transport::udp::UdpConnection::create_connection(
549            local_addr,
550            None,
551            Some(token.child_token()),
552        )
553        .await
554        .map_err(|e| anyhow::anyhow!("Create useragent UDP connection: {} {}", local_addr, e))?;
555
556        transport_layer.add_transport(udp_conn.into());
557        info!("start useragent, addr: {}", local_addr);
558
559        let endpoint_option = rsipstack::transaction::endpoint::EndpointOption::default();
560        let mut endpoint_builder = rsipstack::EndpointBuilder::new();
561        if let Some(ref user_agent) = config.useragent {
562            endpoint_builder.with_user_agent(user_agent.as_str());
563        }
564        let endpoint = endpoint_builder
565            .with_cancel_token(token.child_token())
566            .with_transport_layer(transport_layer)
567            .with_option(endpoint_option)
568            .build();
569        let dialog_layer = Arc::new(DialogLayer::new(endpoint.inner.clone()));
570
571        let stream_engine = self.stream_engine.unwrap_or_default();
572
573        let callrecord_formatter = if let Some(formatter) = self.callrecord_formatter {
574            formatter
575        } else {
576            let formatter = if let Some(ref callrecord) = config.callrecord {
577                DefaultCallRecordFormatter::new_with_config(callrecord)
578            } else {
579                DefaultCallRecordFormatter::default()
580            };
581            Arc::new(formatter)
582        };
583
584        let callrecord_sender = if let Some(sender) = self.callrecord_sender {
585            Some(sender)
586        } else if let Some(ref callrecord) = config.callrecord {
587            let builder = CallRecordManagerBuilder::new()
588                .with_cancel_token(token.child_token())
589                .with_config(callrecord.clone())
590                .with_max_concurrent(32)
591                .with_formatter(callrecord_formatter.clone());
592
593            let mut callrecord_manager = builder.build();
594            let sender = callrecord_manager.sender.clone();
595            tokio::spawn(async move {
596                callrecord_manager.serve().await;
597            });
598            Some(sender)
599        } else {
600            None
601        };
602
603        let app_state = Arc::new(AppStateInner {
604            config,
605            token,
606            stream_engine,
607            callrecord_sender,
608            endpoint,
609            registration_handles: Mutex::new(HashMap::new()),
610            alive_users: Arc::new(RwLock::new(HashSet::new())),
611            dialog_layer: dialog_layer.clone(),
612            create_invitation_handler: self.create_invitation_handler,
613            invitation: Invitation::new(dialog_layer),
614            routing_state: Arc::new(crate::call::RoutingState::new()),
615            pending_playbooks: Arc::new(Mutex::new(HashMap::new())),
616            active_calls: Arc::new(std::sync::Mutex::new(HashMap::new())),
617            total_calls: AtomicU64::new(0),
618            total_failed_calls: AtomicU64::new(0),
619        });
620
621        Ok(app_state)
622    }
623}