1use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17
18use tokio::runtime::Runtime;
19use tokio::sync::mpsc;
20use tokio::time::sleep;
21
22use crate::internet::header;
23use crate::internet::header_field::AsHeaderField;
24use crate::internet::headers::supported::Supported;
25
26use crate::sip::sip_dialog::SipDialog;
27use crate::sip::sip_dialog::SipDialogEventCallbacks;
28use crate::sip::sip_headers::session_expires::AsSessionExpires;
29use crate::sip::sip_message::SipMessage;
30use crate::sip::sip_transaction::server_transaction::ServerTransaction;
31
32use crate::util::raw_string::ToInt;
33pub enum SipSessionEvent {
36 HangupBeforeConfirmingDialog(Arc<SipDialog>),
37 Started,
38 ShouldRefresh(Arc<SipDialog>), Expired(Arc<SipDialog>),
40}
41
42pub trait SipSessionEventCallback {
43 fn on_event(&self, ev: SipSessionEvent);
44}
45
46pub struct SipSessionEventReceiver {
47 pub tx: mpsc::Sender<SipSessionEvent>,
48 pub rt: Arc<Runtime>,
49}
50
51impl SipSessionEventCallback for SipSessionEventReceiver {
52 fn on_event(&self, ev: SipSessionEvent) {
53 let tx = self.tx.clone();
54 self.rt.spawn(async move {
55 match tx.send(ev).await {
56 Ok(()) => {}
57 Err(e) => {}
58 }
59 });
60 }
61}
62
63pub struct EarlySession {
64 dialogs: Vec<(
65 Arc<SipDialog>,
66 Arc<dyn SipDialogEventCallbacks + Send + Sync>,
67 )>,
68}
69
70pub struct ConfirmedSession {
71 dialog: (
72 Arc<SipDialog>,
73 Arc<dyn SipDialogEventCallbacks + Send + Sync>,
74 ),
75 timeout_counter: Arc<Mutex<Option<SessionTimeoutCounter>>>,
76}
77
78pub enum SipSessionState {
79 Early(EarlySession),
80 Confirmed(ConfirmedSession),
81 HungUp,
82}
83
84pub struct SipSession<T> {
85 inner: Arc<T>,
86 state: Arc<Mutex<SipSessionState>>,
87 callback: Arc<Box<dyn SipSessionEventCallback + Send + Sync>>,
88}
89
90impl<T> SipSession<T> {
91 pub fn new<C>(inner: &Arc<T>, callback: C) -> SipSession<T>
92 where
93 C: SipSessionEventCallback + Send + Sync + 'static,
94 {
95 SipSession {
96 inner: Arc::clone(inner),
97 state: Arc::new(Mutex::new(SipSessionState::Early(EarlySession {
98 dialogs: Vec::new(),
99 }))),
100 callback: Arc::new(Box::new(callback)),
101 }
102 }
103
104 pub fn get_inner(&self) -> Arc<T> {
105 Arc::clone(&self.inner)
106 }
107
108 pub fn setup_early_dialog<C>(&self, dialog: &Arc<SipDialog>, callback: C)
109 where
110 C: SipDialogEventCallbacks + Send + Sync + 'static,
111 {
112 let mut guard = self.state.lock().unwrap();
113 match &mut *guard {
114 SipSessionState::Early(early) => {
115 let callback = dialog.register_user(callback);
116 early.dialogs.push((Arc::clone(dialog), callback));
117 }
118 _ => {}
119 }
120 }
121
122 pub fn setup_confirmed_dialog<C>(&self, dialog: &Arc<SipDialog>, callback: C)
123 where
124 C: SipDialogEventCallbacks + Send + Sync + 'static,
125 {
126 let mut guard = self.state.lock().unwrap();
127 match *guard {
128 SipSessionState::Early(_) => {
129 let callback = dialog.register_user(callback);
130 *guard = SipSessionState::Confirmed(ConfirmedSession {
131 dialog: (Arc::clone(dialog), callback),
132 timeout_counter: Arc::new(Mutex::new(None)),
133 });
134 self.callback.on_event(SipSessionEvent::Started);
135 }
136 _ => {}
137 }
138 }
139
140 pub fn mark_session_active(&self) {
141 let guard = self.state.lock().unwrap();
142 match &*guard {
143 SipSessionState::Confirmed(confirmed) => {
144 let mut guard = confirmed.timeout_counter.lock().unwrap();
145 match &mut *guard {
146 Some(counter) => {
147 counter.mark_active();
148 }
149 _ => {}
150 }
151 }
152 _ => {}
153 }
154 }
155
156 pub fn schedule_refresh(
157 &self,
158 timeout: u32,
159 is_refresher: bool,
160 rt: &Arc<Runtime>, ) {
162 let guard = self.state.lock().unwrap();
163 match &*guard {
164 SipSessionState::Confirmed(confirmed) => {
165 let duration = Duration::from_secs(timeout.into());
166 let mut guard = confirmed.timeout_counter.lock().unwrap();
167 match &mut *guard {
168 Some(counter) => {
169 let timer_counter = Arc::clone(&confirmed.timeout_counter);
170 if let Some(expiration) = counter.mark_expiration(duration, is_refresher) {
171 let (dialog, _) = &confirmed.dialog;
172 schedule(
173 timer_counter,
174 expiration,
175 duration,
176 dialog,
178 rt,
179 &self.callback,
180 );
181 }
182 }
183 None => {
184 let expiration = Instant::now() + duration;
185 *guard = Some(SessionTimeoutCounter::new(expiration, is_refresher));
186 let timer_counter = Arc::clone(&confirmed.timeout_counter);
187 let (dialog, _) = &confirmed.dialog;
188 schedule(
189 timer_counter,
190 expiration,
191 duration,
192 dialog,
194 rt,
195 &self.callback,
196 );
197 }
198 }
199 }
200 _ => {}
201 }
202 }
203
204 pub fn hang_up(&self) {
205 let mut guard = self.state.lock().unwrap();
206 match &*guard {
207 SipSessionState::Early(early) => {
208 for (dialog, callback) in &early.dialogs {
209 if let Some(on_dispose) = dialog.unregister_user(callback) {
214 let dialog = Arc::clone(dialog);
216 on_dispose(dialog);
217 }
218 let dialog = Arc::clone(dialog);
219 self.callback
220 .on_event(SipSessionEvent::HangupBeforeConfirmingDialog(dialog));
221 }
222 *guard = SipSessionState::HungUp;
223 }
224 SipSessionState::Confirmed(confirmed) => {
225 let (dialog, callback) = &confirmed.dialog;
226 if let Some(on_dispose) = dialog.unregister_user(callback) {
231 let dialog = Arc::clone(dialog);
232 on_dispose(dialog);
233 }
234 *guard = SipSessionState::HungUp;
235 }
236 _ => {}
237 }
238 }
239}
240
241pub enum Refresher {
242 UAC,
243 UAS,
244}
245
246pub struct SessionTimeoutCounter {
247 active_until: Instant,
248 expiration: Instant,
249 is_refresher: bool,
250}
251
252impl SessionTimeoutCounter {
253 pub fn new(expiration: Instant, is_refresher: bool) -> SessionTimeoutCounter {
254 SessionTimeoutCounter {
255 active_until: Instant::now(),
256 expiration,
257 is_refresher,
258 }
259 }
260
261 pub fn mark_active(&mut self) {
262 self.active_until = Instant::now();
263 }
264
265 pub fn expiration(&self) -> Instant {
266 self.expiration
267 }
268
269 pub fn mark_expiration(&mut self, duration: Duration, is_refresher: bool) -> Option<Instant> {
270 let expiration = Instant::now() + duration;
271 self.is_refresher = is_refresher;
272 if self.expiration < expiration {
273 self.expiration = expiration;
274 return Some(expiration);
275 }
276 None
277 }
278
279 pub fn on_refresh_timer(
280 &self,
281 dialog: Arc<SipDialog>,
282 callback: Arc<Box<dyn SipSessionEventCallback + Send + Sync>>,
283 ) {
284 let idle_time = Instant::now() - self.active_until;
285 if idle_time < Duration::from_secs(3600) {
286 if self.is_refresher {
287 callback.on_event(SipSessionEvent::ShouldRefresh(dialog));
288 }
289 }
290 }
291
292 pub fn on_expire_timer(
293 &self,
294 dialog: Arc<SipDialog>,
295 callback: Arc<Box<dyn SipSessionEventCallback + Send + Sync>>,
296 ) {
297 if Instant::now() > self.expiration {
298 callback.on_event(SipSessionEvent::Expired(dialog));
299 }
300 }
301}
302
303fn schedule(
304 timer_counter: Arc<Mutex<Option<SessionTimeoutCounter>>>,
305 expiration: Instant,
306 duration: Duration,
307 dialog: &Arc<SipDialog>,
309 rt: &Arc<Runtime>,
310 callback: &Arc<Box<dyn SipSessionEventCallback + Send + Sync>>,
311) {
312 let duration_refresh = duration - Duration::from_secs(120); let timer_counter_ = Arc::clone(&timer_counter);
314 let dialog_ = Arc::clone(dialog);
315 let callback_ = Arc::clone(callback);
316 rt.spawn(async move {
327 sleep(duration_refresh).await;
328 let timer_counter = timer_counter_;
329 let mut guard = timer_counter.lock().unwrap();
330 match &mut *guard {
331 Some(timer_counter) => {
332 timer_counter.on_refresh_timer(dialog_, callback_);
333 }
334 None => {}
335 }
336 });
337
338 let dialog_ = Arc::clone(dialog);
339 let callback_ = Arc::clone(callback);
340 rt.spawn(async move {
352 sleep(duration).await;
353 let mut guard = timer_counter.lock().unwrap();
354 match &mut *guard {
355 Some(timer_counter) => {
356 if expiration == timer_counter.expiration() {
357 timer_counter.on_expire_timer(dialog_, callback_);
358 }
359 }
360 None => {}
361 }
362 });
363}
364
365pub fn choose_timeout_on_client_transaction_completion(
366 wanted_uac_timeout: Option<u32>,
369 message: &SipMessage,
370) -> Option<(u32, Refresher)> {
371 if let SipMessage::Response(_, Some(resp_headers), _) = message {
372 if let Some(session_expires_header) = header::search(resp_headers, b"Session-Expires", true)
373 {
374 let session_expires_header_field = session_expires_header.get_value().as_header_field();
375 if let Some(session_expires) = session_expires_header_field.as_session_expires() {
376 if let Some(refresher) = session_expires.refresher {
377 if refresher == b"uac" {
378 return Some((session_expires.timeout, Refresher::UAC));
379 } else if refresher == b"uas" {
380 return Some((session_expires.timeout, Refresher::UAS));
381 }
382 }
383 }
384 }
385
386 if let Some(timeout) = wanted_uac_timeout {
398 return Some((timeout, Refresher::UAC));
399 }
400 }
401
402 None
403}
404
405pub fn choose_timeout_for_server_transaction_response(
407 transaction: &Arc<ServerTransaction>,
408 previously_supports_timer: bool,
409 previous_refresher: Refresher,
410) -> Result<Option<(u32, Refresher)>, (u16, &'static [u8], u32)> {
411 if let SipMessage::Request(_, Some(headers), _) = transaction.message() {
412 let mut explicitly_supports_timer = false;
413
414 if let Some(supported_header) = header::search(headers, b"Supported", true) {
415 if supported_header.supports(b"timer") {
416 explicitly_supports_timer = true;
417 }
418 }
419
420 if let Some(session_expires_header) = header::search(headers, b"Session-Expires", true) {
421 let session_expires_header_field = session_expires_header.get_value().as_header_field();
422 if let Some(session_expires) = session_expires_header_field.as_session_expires() {
423 if session_expires.timeout < 90 {
424 return Err((422, b"Session Interval Too Small", 90));
425 } else {
426 if let Some(refresher) = session_expires.refresher {
427 if !explicitly_supports_timer {
428 return Err((400, b"Bad Request", 0));
429 }
430 if refresher == b"uac" {
431 return Ok(Some((session_expires.timeout, Refresher::UAC)));
432 } else if refresher == b"uas" {
433 return Ok(Some((session_expires.timeout, Refresher::UAS)));
434 }
435 }
436
437 if explicitly_supports_timer {
438 return Ok(Some((session_expires.timeout, previous_refresher)));
439 } else if previously_supports_timer {
440 return Ok(Some((session_expires.timeout, Refresher::UAS)));
441 }
442 }
443 }
444 }
445
446 if let Some(min_se_header) = header::search(headers, b"Min-SE", true) {
447 if let Ok(min_se) = min_se_header.get_value().to_int() {
448 if explicitly_supports_timer {
449 return Ok(Some((min_se, previous_refresher)));
450 } else if previously_supports_timer {
451 return Ok(Some((min_se, Refresher::UAS)));
452 }
453 }
454 }
455
456 if explicitly_supports_timer {
457 return Ok(Some((900, previous_refresher)));
458 } else if previously_supports_timer {
459 return Ok(Some((900, Refresher::UAS)));
460 }
461 }
462
463 Ok(None)
464}