rust_rcs_core/sip/
sip_session.rs

1// Copyright 2023 宋昊文
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17
18use tokio::runtime::Runtime;
19use tokio::sync::mpsc;
20use tokio::time::sleep;
21
22use crate::internet::header;
23use crate::internet::header_field::AsHeaderField;
24use crate::internet::headers::supported::Supported;
25
26use crate::sip::sip_dialog::SipDialog;
27use crate::sip::sip_dialog::SipDialogEventCallbacks;
28use crate::sip::sip_headers::session_expires::AsSessionExpires;
29use crate::sip::sip_message::SipMessage;
30use crate::sip::sip_transaction::server_transaction::ServerTransaction;
31
32use crate::util::raw_string::ToInt;
33// use crate::util::timer::Timer;
34
35pub enum SipSessionEvent {
36    HangupBeforeConfirmingDialog(Arc<SipDialog>),
37    Started,
38    ShouldRefresh(Arc<SipDialog>), // to-do: provide a callback so we can re-try UPDATE if failed with a non-dialog-terminating error
39    Expired(Arc<SipDialog>),
40}
41
42pub trait SipSessionEventCallback {
43    fn on_event(&self, ev: SipSessionEvent);
44}
45
46pub struct SipSessionEventReceiver {
47    pub tx: mpsc::Sender<SipSessionEvent>,
48    pub rt: Arc<Runtime>,
49}
50
51impl SipSessionEventCallback for SipSessionEventReceiver {
52    fn on_event(&self, ev: SipSessionEvent) {
53        let tx = self.tx.clone();
54        self.rt.spawn(async move {
55            match tx.send(ev).await {
56                Ok(()) => {}
57                Err(e) => {}
58            }
59        });
60    }
61}
62
63pub struct EarlySession {
64    dialogs: Vec<(
65        Arc<SipDialog>,
66        Arc<dyn SipDialogEventCallbacks + Send + Sync>,
67    )>,
68}
69
70pub struct ConfirmedSession {
71    dialog: (
72        Arc<SipDialog>,
73        Arc<dyn SipDialogEventCallbacks + Send + Sync>,
74    ),
75    timeout_counter: Arc<Mutex<Option<SessionTimeoutCounter>>>,
76}
77
78pub enum SipSessionState {
79    Early(EarlySession),
80    Confirmed(ConfirmedSession),
81    HungUp,
82}
83
84pub struct SipSession<T> {
85    inner: Arc<T>,
86    state: Arc<Mutex<SipSessionState>>,
87    callback: Arc<Box<dyn SipSessionEventCallback + Send + Sync>>,
88}
89
90impl<T> SipSession<T> {
91    pub fn new<C>(inner: &Arc<T>, callback: C) -> SipSession<T>
92    where
93        C: SipSessionEventCallback + Send + Sync + 'static,
94    {
95        SipSession {
96            inner: Arc::clone(inner),
97            state: Arc::new(Mutex::new(SipSessionState::Early(EarlySession {
98                dialogs: Vec::new(),
99            }))),
100            callback: Arc::new(Box::new(callback)),
101        }
102    }
103
104    pub fn get_inner(&self) -> Arc<T> {
105        Arc::clone(&self.inner)
106    }
107
108    pub fn setup_early_dialog<C>(&self, dialog: &Arc<SipDialog>, callback: C)
109    where
110        C: SipDialogEventCallbacks + Send + Sync + 'static,
111    {
112        let mut guard = self.state.lock().unwrap();
113        match &mut *guard {
114            SipSessionState::Early(early) => {
115                let callback = dialog.register_user(callback);
116                early.dialogs.push((Arc::clone(dialog), callback));
117            }
118            _ => {}
119        }
120    }
121
122    pub fn setup_confirmed_dialog<C>(&self, dialog: &Arc<SipDialog>, callback: C)
123    where
124        C: SipDialogEventCallbacks + Send + Sync + 'static,
125    {
126        let mut guard = self.state.lock().unwrap();
127        match *guard {
128            SipSessionState::Early(_) => {
129                let callback = dialog.register_user(callback);
130                *guard = SipSessionState::Confirmed(ConfirmedSession {
131                    dialog: (Arc::clone(dialog), callback),
132                    timeout_counter: Arc::new(Mutex::new(None)),
133                });
134                self.callback.on_event(SipSessionEvent::Started);
135            }
136            _ => {}
137        }
138    }
139
140    pub fn mark_session_active(&self) {
141        let guard = self.state.lock().unwrap();
142        match &*guard {
143            SipSessionState::Confirmed(confirmed) => {
144                let mut guard = confirmed.timeout_counter.lock().unwrap();
145                match &mut *guard {
146                    Some(counter) => {
147                        counter.mark_active();
148                    }
149                    _ => {}
150                }
151            }
152            _ => {}
153        }
154    }
155
156    pub fn schedule_refresh(
157        &self,
158        timeout: u32,
159        is_refresher: bool,
160        rt: &Arc<Runtime>, /* timer: &Timer */
161    ) {
162        let guard = self.state.lock().unwrap();
163        match &*guard {
164            SipSessionState::Confirmed(confirmed) => {
165                let duration = Duration::from_secs(timeout.into());
166                let mut guard = confirmed.timeout_counter.lock().unwrap();
167                match &mut *guard {
168                    Some(counter) => {
169                        let timer_counter = Arc::clone(&confirmed.timeout_counter);
170                        if let Some(expiration) = counter.mark_expiration(duration, is_refresher) {
171                            let (dialog, _) = &confirmed.dialog;
172                            schedule(
173                                timer_counter,
174                                expiration,
175                                duration,
176                                // timer,
177                                dialog,
178                                rt,
179                                &self.callback,
180                            );
181                        }
182                    }
183                    None => {
184                        let expiration = Instant::now() + duration;
185                        *guard = Some(SessionTimeoutCounter::new(expiration, is_refresher));
186                        let timer_counter = Arc::clone(&confirmed.timeout_counter);
187                        let (dialog, _) = &confirmed.dialog;
188                        schedule(
189                            timer_counter,
190                            expiration,
191                            duration,
192                            // timer,
193                            dialog,
194                            rt,
195                            &self.callback,
196                        );
197                    }
198                }
199            }
200            _ => {}
201        }
202    }
203
204    pub fn hang_up(&self) {
205        let mut guard = self.state.lock().unwrap();
206        match &*guard {
207            SipSessionState::Early(early) => {
208                for (dialog, callback) in &early.dialogs {
209                    // let completed = dialog.unregister_user(*id);
210                    // if completed {
211                    //     dialog.call_last_user_removed_callback(Arc::clone(dialog));
212                    // }
213                    if let Some(on_dispose) = dialog.unregister_user(callback) {
214                        // to-do: make sure dialog is actually in Early State
215                        let dialog = Arc::clone(dialog);
216                        on_dispose(dialog);
217                    }
218                    let dialog = Arc::clone(dialog);
219                    self.callback
220                        .on_event(SipSessionEvent::HangupBeforeConfirmingDialog(dialog));
221                }
222                *guard = SipSessionState::HungUp;
223            }
224            SipSessionState::Confirmed(confirmed) => {
225                let (dialog, callback) = &confirmed.dialog;
226                // let completed = dialog.unregister_user(*id);
227                // if completed {
228                //     dialog.call_last_user_removed_callback(Arc::clone(dialog));
229                // }
230                if let Some(on_dispose) = dialog.unregister_user(callback) {
231                    let dialog = Arc::clone(dialog);
232                    on_dispose(dialog);
233                }
234                *guard = SipSessionState::HungUp;
235            }
236            _ => {}
237        }
238    }
239}
240
241pub enum Refresher {
242    UAC,
243    UAS,
244}
245
246pub struct SessionTimeoutCounter {
247    active_until: Instant,
248    expiration: Instant,
249    is_refresher: bool,
250}
251
252impl SessionTimeoutCounter {
253    pub fn new(expiration: Instant, is_refresher: bool) -> SessionTimeoutCounter {
254        SessionTimeoutCounter {
255            active_until: Instant::now(),
256            expiration,
257            is_refresher,
258        }
259    }
260
261    pub fn mark_active(&mut self) {
262        self.active_until = Instant::now();
263    }
264
265    pub fn expiration(&self) -> Instant {
266        self.expiration
267    }
268
269    pub fn mark_expiration(&mut self, duration: Duration, is_refresher: bool) -> Option<Instant> {
270        let expiration = Instant::now() + duration;
271        self.is_refresher = is_refresher;
272        if self.expiration < expiration {
273            self.expiration = expiration;
274            return Some(expiration);
275        }
276        None
277    }
278
279    pub fn on_refresh_timer(
280        &self,
281        dialog: Arc<SipDialog>,
282        callback: Arc<Box<dyn SipSessionEventCallback + Send + Sync>>,
283    ) {
284        let idle_time = Instant::now() - self.active_until;
285        if idle_time < Duration::from_secs(3600) {
286            if self.is_refresher {
287                callback.on_event(SipSessionEvent::ShouldRefresh(dialog));
288            }
289        }
290    }
291
292    pub fn on_expire_timer(
293        &self,
294        dialog: Arc<SipDialog>,
295        callback: Arc<Box<dyn SipSessionEventCallback + Send + Sync>>,
296    ) {
297        if Instant::now() > self.expiration {
298            callback.on_event(SipSessionEvent::Expired(dialog));
299        }
300    }
301}
302
303fn schedule(
304    timer_counter: Arc<Mutex<Option<SessionTimeoutCounter>>>,
305    expiration: Instant,
306    duration: Duration,
307    // timer: &Timer,
308    dialog: &Arc<SipDialog>,
309    rt: &Arc<Runtime>,
310    callback: &Arc<Box<dyn SipSessionEventCallback + Send + Sync>>,
311) {
312    let duration_refresh = duration - Duration::from_secs(120); // to-do: lower limit
313    let timer_counter_ = Arc::clone(&timer_counter);
314    let dialog_ = Arc::clone(dialog);
315    let callback_ = Arc::clone(callback);
316    // timer.schedule(duration_refresh, move || {
317    //     let timer_counter = timer_counter_;
318    //     let mut guard = timer_counter.lock().unwrap();
319    //     match &mut *guard {
320    //         Some(timer_counter) => {
321    //             timer_counter.on_refresh_timer(dialog_, callback_);
322    //         }
323    //         None => {}
324    //     }
325    // });
326    rt.spawn(async move {
327        sleep(duration_refresh).await;
328        let timer_counter = timer_counter_;
329        let mut guard = timer_counter.lock().unwrap();
330        match &mut *guard {
331            Some(timer_counter) => {
332                timer_counter.on_refresh_timer(dialog_, callback_);
333            }
334            None => {}
335        }
336    });
337
338    let dialog_ = Arc::clone(dialog);
339    let callback_ = Arc::clone(callback);
340    // timer.schedule(duration, move || {
341    //     let mut guard = timer_counter.lock().unwrap();
342    //     match &mut *guard {
343    //         Some(timer_counter) => {
344    //             if expiration == timer_counter.expiration() {
345    //                 timer_counter.on_expire_timer(dialog_, callback_);
346    //             }
347    //         }
348    //         None => {}
349    //     }
350    // });
351    rt.spawn(async move {
352        sleep(duration).await;
353        let mut guard = timer_counter.lock().unwrap();
354        match &mut *guard {
355            Some(timer_counter) => {
356                if expiration == timer_counter.expiration() {
357                    timer_counter.on_expire_timer(dialog_, callback_);
358                }
359            }
360            None => {}
361        }
362    });
363}
364
365pub fn choose_timeout_on_client_transaction_completion(
366    // transaction: &Arc<ClientTransaction>,
367    // request_session_expires_header: Option<&Header>,
368    wanted_uac_timeout: Option<u32>,
369    message: &SipMessage,
370) -> Option<(u32, Refresher)> {
371    if let SipMessage::Response(_, Some(resp_headers), _) = message {
372        if let Some(session_expires_header) = header::search(resp_headers, b"Session-Expires", true)
373        {
374            let session_expires_header_field = session_expires_header.get_value().as_header_field();
375            if let Some(session_expires) = session_expires_header_field.as_session_expires() {
376                if let Some(refresher) = session_expires.refresher {
377                    if refresher == b"uac" {
378                        return Some((session_expires.timeout, Refresher::UAC));
379                    } else if refresher == b"uas" {
380                        return Some((session_expires.timeout, Refresher::UAS));
381                    }
382                }
383            }
384        }
385
386        // if let Some(session_expires_header) = request_session_expires_header {
387        //     let session_expires_header_field = session_expires_header.get_value().as_header_field();
388        //     if let Some(session_expires) = session_expires_header_field.as_session_expires() {
389        //         if let Some(refresher) = session_expires.refresher {
390        //             if refresher == b"uac" {
391        //                 return Some((session_expires.timeout, Refresher::UAC));
392        //             }
393        //         }
394        //     }
395        // }
396
397        if let Some(timeout) = wanted_uac_timeout {
398            return Some((timeout, Refresher::UAC));
399        }
400    }
401
402    None
403}
404
405/// On 422 error, includes a Min-SE
406pub fn choose_timeout_for_server_transaction_response(
407    transaction: &Arc<ServerTransaction>,
408    previously_supports_timer: bool,
409    previous_refresher: Refresher,
410) -> Result<Option<(u32, Refresher)>, (u16, &'static [u8], u32)> {
411    if let SipMessage::Request(_, Some(headers), _) = transaction.message() {
412        let mut explicitly_supports_timer = false;
413
414        if let Some(supported_header) = header::search(headers, b"Supported", true) {
415            if supported_header.supports(b"timer") {
416                explicitly_supports_timer = true;
417            }
418        }
419
420        if let Some(session_expires_header) = header::search(headers, b"Session-Expires", true) {
421            let session_expires_header_field = session_expires_header.get_value().as_header_field();
422            if let Some(session_expires) = session_expires_header_field.as_session_expires() {
423                if session_expires.timeout < 90 {
424                    return Err((422, b"Session Interval Too Small", 90));
425                } else {
426                    if let Some(refresher) = session_expires.refresher {
427                        if !explicitly_supports_timer {
428                            return Err((400, b"Bad Request", 0));
429                        }
430                        if refresher == b"uac" {
431                            return Ok(Some((session_expires.timeout, Refresher::UAC)));
432                        } else if refresher == b"uas" {
433                            return Ok(Some((session_expires.timeout, Refresher::UAS)));
434                        }
435                    }
436
437                    if explicitly_supports_timer {
438                        return Ok(Some((session_expires.timeout, previous_refresher)));
439                    } else if previously_supports_timer {
440                        return Ok(Some((session_expires.timeout, Refresher::UAS)));
441                    }
442                }
443            }
444        }
445
446        if let Some(min_se_header) = header::search(headers, b"Min-SE", true) {
447            if let Ok(min_se) = min_se_header.get_value().to_int() {
448                if explicitly_supports_timer {
449                    return Ok(Some((min_se, previous_refresher)));
450                } else if previously_supports_timer {
451                    return Ok(Some((min_se, Refresher::UAS)));
452                }
453            }
454        }
455
456        if explicitly_supports_timer {
457            return Ok(Some((900, previous_refresher)));
458        } else if previously_supports_timer {
459            return Ok(Some((900, Refresher::UAS)));
460        }
461    }
462
463    Ok(None)
464}