1use std::future::Future;
87use std::sync::Arc;
88use std::time::Duration;
89
90use rsip::prelude::UntypedHeader;
91use rsip::Header;
92use rsipstack::dialog::client_dialog::ClientInviteDialog;
93use rsipstack::dialog::server_dialog::ServerInviteDialog;
94use tokio::select;
95use tokio::sync::Notify;
96use tokio_util::sync::CancellationToken;
97use tracing::{debug, info, warn};
98
99type BoxError = Box<dyn std::error::Error + Send + Sync>;
100
101pub const MIN_SESSION_EXPIRES_SECS: u32 = 90;
105
106pub const DEFAULT_SESSION_EXPIRES_SECS: u32 = 1800;
109
110const MAX_EXPIRY_HEADROOM_SECS: u32 = 32;
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117pub enum Refresher {
118 Uac,
120 Uas,
122}
123
124impl Refresher {
125 pub fn as_str(&self) -> &'static str {
127 match self {
128 Refresher::Uac => "uac",
129 Refresher::Uas => "uas",
130 }
131 }
132
133 fn parse(s: &str) -> Result<Self, String> {
134 if s.eq_ignore_ascii_case("uac") {
135 Ok(Refresher::Uac)
136 } else if s.eq_ignore_ascii_case("uas") {
137 Ok(Refresher::Uas)
138 } else {
139 Err(format!("invalid refresher value {s:?} (want uac|uas)"))
140 }
141 }
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub struct SessionExpires {
148 pub interval_secs: u32,
150 pub refresher: Option<Refresher>,
152}
153
154impl SessionExpires {
155 pub fn parse(value: &str) -> Result<Self, String> {
160 let mut parts = value.split(';');
161 let interval = parts.next().unwrap_or("").trim();
162 let interval_secs: u32 = interval
163 .parse()
164 .map_err(|e| format!("invalid Session-Expires interval {interval:?}: {e}"))?;
165 let mut refresher = None;
166 for param in parts {
167 if let Some((name, val)) = param.split_once('=') {
168 if name.trim().eq_ignore_ascii_case("refresher") {
169 refresher = Some(Refresher::parse(val.trim())?);
170 }
171 }
172 }
173 Ok(Self {
174 interval_secs,
175 refresher,
176 })
177 }
178
179 pub fn header_value(&self) -> String {
181 match self.refresher {
182 Some(r) => format!("{};refresher={}", self.interval_secs, r.as_str()),
183 None => self.interval_secs.to_string(),
184 }
185 }
186
187 pub fn header(&self) -> Header {
190 Header::Other("Session-Expires".into(), self.header_value())
191 }
192}
193
194pub fn supported_timer_header() -> Header {
197 Header::Supported("timer".into())
198}
199
200pub fn require_timer_header() -> Header {
203 Header::Require("timer".into())
204}
205
206fn other_header_value<'a>(headers: &'a rsip::Headers, names: &[&str]) -> Option<&'a str> {
209 headers.iter().find_map(|h| match h {
210 Header::Other(name, value) if names.iter().any(|n| name.eq_ignore_ascii_case(n)) => {
211 Some(value.as_str())
212 }
213 _ => None,
214 })
215}
216
217pub fn session_expires_in(headers: &rsip::Headers) -> Option<SessionExpires> {
221 let raw = other_header_value(headers, &["Session-Expires", "x"])?;
222 match SessionExpires::parse(raw) {
223 Ok(se) => Some(se),
224 Err(e) => {
225 warn!("ignoring malformed Session-Expires header: {e}");
226 None
227 }
228 }
229}
230
231pub fn min_se_in(headers: &rsip::Headers) -> Option<u32> {
234 let raw = other_header_value(headers, &["Min-SE"])?;
235 let interval = raw.split(';').next().unwrap_or("").trim();
236 match interval.parse() {
237 Ok(v) => Some(v),
238 Err(e) => {
239 warn!("ignoring malformed Min-SE header {raw:?}: {e}");
240 None
241 }
242 }
243}
244
245fn has_timer_token(value: &str) -> bool {
246 value
247 .split(',')
248 .any(|tag| tag.trim().eq_ignore_ascii_case("timer"))
249}
250
251pub fn supports_timer(headers: &rsip::Headers) -> bool {
254 headers.iter().any(|h| match h {
255 Header::Supported(s) => has_timer_token(s.value()),
256 Header::Other(name, value)
257 if name.eq_ignore_ascii_case("Supported") || name.eq_ignore_ascii_case("k") =>
258 {
259 has_timer_token(value)
260 }
261 _ => false,
262 })
263}
264
265#[derive(Debug, Clone, Copy, PartialEq, Eq)]
268pub struct SessionTimer {
269 pub interval_secs: u32,
271 pub we_are_refresher: bool,
274}
275
276impl SessionTimer {
277 pub fn refresh_after(&self) -> Duration {
280 Duration::from_secs(u64::from(self.interval_secs / 2))
281 }
282
283 pub fn expiry_after(&self) -> Duration {
287 let headroom = (self.interval_secs / 3).min(MAX_EXPIRY_HEADROOM_SECS);
288 Duration::from_secs(u64::from(self.interval_secs.saturating_sub(headroom)))
289 }
290}
291
292pub fn negotiate_uac(response_headers: &rsip::Headers) -> Option<SessionTimer> {
305 let se = session_expires_in(response_headers)?;
306 Some(SessionTimer {
307 interval_secs: se.interval_secs.max(MIN_SESSION_EXPIRES_SECS),
308 we_are_refresher: !matches!(se.refresher, Some(Refresher::Uas)),
309 })
310}
311
312#[derive(Debug, Clone, Copy, PartialEq, Eq)]
315pub struct UasSessionTimer {
316 pub timer: SessionTimer,
318 pub echo: SessionExpires,
321 pub require_timer: bool,
324}
325
326pub fn negotiate_uas(invite_headers: &rsip::Headers) -> Option<UasSessionTimer> {
341 let se = session_expires_in(invite_headers)?;
342 let floor = min_se_in(invite_headers)
343 .unwrap_or(0)
344 .max(MIN_SESSION_EXPIRES_SECS);
345 let interval_secs = se.interval_secs.max(floor);
346 let peer_supports = supports_timer(invite_headers);
347 let refresher = if peer_supports {
348 se.refresher.unwrap_or(Refresher::Uac)
349 } else {
350 Refresher::Uas
351 };
352 Some(UasSessionTimer {
353 timer: SessionTimer {
354 interval_secs,
355 we_are_refresher: refresher == Refresher::Uas,
356 },
357 echo: SessionExpires {
358 interval_secs,
359 refresher: Some(refresher),
360 },
361 require_timer: peer_supports,
362 })
363}
364
365pub trait SessionDialogOps {
371 fn refresh(
375 &self,
376 headers: Vec<Header>,
377 body: Option<Vec<u8>>,
378 ) -> impl Future<Output = Result<Option<rsip::Response>, BoxError>> + Send;
379
380 fn send_bye(&self) -> impl Future<Output = Result<(), BoxError>> + Send;
382}
383
384impl SessionDialogOps for ClientInviteDialog {
385 async fn refresh(
386 &self,
387 headers: Vec<Header>,
388 body: Option<Vec<u8>>,
389 ) -> Result<Option<rsip::Response>, BoxError> {
390 Ok(self.reinvite(Some(headers), body).await?)
391 }
392
393 async fn send_bye(&self) -> Result<(), BoxError> {
394 Ok(self.bye().await?)
395 }
396}
397
398impl SessionDialogOps for ServerInviteDialog {
399 async fn refresh(
400 &self,
401 headers: Vec<Header>,
402 body: Option<Vec<u8>>,
403 ) -> Result<Option<rsip::Response>, BoxError> {
404 Ok(self.reinvite(Some(headers), body).await?)
405 }
406
407 async fn send_bye(&self) -> Result<(), BoxError> {
408 Ok(self.bye().await?)
409 }
410}
411
412#[derive(Debug, Clone, Copy, PartialEq, Eq)]
414pub enum SessionTimerOutcome {
415 Cancelled,
418 Expired,
421 RefreshFailed,
424 DialogGone,
427}
428
429pub async fn session_timer_loop<D: SessionDialogOps>(
448 dialog: &D,
449 timer: SessionTimer,
450 refresh_body: Option<Vec<u8>>,
451 peer_refreshed: Arc<Notify>,
452 cancel: CancellationToken,
453) -> SessionTimerOutcome {
454 let mut interval_secs = timer.interval_secs.max(MIN_SESSION_EXPIRES_SECS);
455 if timer.we_are_refresher {
456 loop {
457 let current = SessionTimer {
458 interval_secs,
459 we_are_refresher: true,
460 };
461 select! {
462 _ = tokio::time::sleep(current.refresh_after()) => {}
463 _ = cancel.cancelled() => return SessionTimerOutcome::Cancelled,
464 }
465
466 let headers = vec![
469 supported_timer_header(),
470 SessionExpires {
471 interval_secs,
472 refresher: Some(Refresher::Uac),
473 }
474 .header(),
475 ];
476 match dialog.refresh(headers, refresh_body.clone()).await {
477 Ok(Some(resp)) if resp.status_code.kind() == rsip::StatusCodeKind::Successful => {
478 if let Some(granted) = session_expires_in(&resp.headers) {
481 interval_secs = granted.interval_secs.max(MIN_SESSION_EXPIRES_SECS);
482 }
483 debug!(interval_secs, "session refresh accepted");
484 }
485 Ok(Some(resp)) => {
486 warn!(
487 status = %resp.status_code,
488 "session refresh rejected; hanging up"
489 );
490 if let Err(e) = dialog.send_bye().await {
491 warn!("BYE after rejected refresh failed: {e}");
492 }
493 return SessionTimerOutcome::RefreshFailed;
494 }
495 Ok(None) => {
496 debug!("dialog no longer confirmed; session timer standing down");
497 return SessionTimerOutcome::DialogGone;
498 }
499 Err(e) => {
500 warn!("session refresh error: {e}; hanging up");
501 if let Err(e) = dialog.send_bye().await {
502 warn!("BYE after failed refresh failed: {e}");
503 }
504 return SessionTimerOutcome::RefreshFailed;
505 }
506 }
507 }
508 } else {
509 let current = SessionTimer {
510 interval_secs,
511 we_are_refresher: false,
512 };
513 loop {
514 select! {
515 _ = tokio::time::sleep(current.expiry_after()) => {
516 info!(
517 interval_secs,
518 "session lapsed without refresh; sending BYE"
519 );
520 if let Err(e) = dialog.send_bye().await {
521 warn!("BYE after session expiry failed: {e}");
522 }
523 return SessionTimerOutcome::Expired;
524 }
525 _ = peer_refreshed.notified() => {
526 debug!("peer refreshed session; watchdog deadline reset");
527 }
528 _ = cancel.cancelled() => return SessionTimerOutcome::Cancelled,
529 }
530 }
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537 use std::sync::Mutex;
538
539 #[test]
542 fn parse_bare_interval() {
543 let se = SessionExpires::parse("1800").unwrap();
544 assert_eq!(se.interval_secs, 1800);
545 assert_eq!(se.refresher, None);
546 }
547
548 #[test]
549 fn parse_with_refresher_param() {
550 let se = SessionExpires::parse("1800;refresher=uas").unwrap();
551 assert_eq!(se.interval_secs, 1800);
552 assert_eq!(se.refresher, Some(Refresher::Uas));
553 let se = SessionExpires::parse("90;refresher=uac").unwrap();
554 assert_eq!(se.refresher, Some(Refresher::Uac));
555 }
556
557 #[test]
558 fn parse_is_case_insensitive_and_whitespace_tolerant() {
559 let se = SessionExpires::parse(" 600 ; Refresher = UAS ").unwrap();
560 assert_eq!(se.interval_secs, 600);
561 assert_eq!(se.refresher, Some(Refresher::Uas));
562 }
563
564 #[test]
565 fn parse_ignores_unknown_params() {
566 let se = SessionExpires::parse("1800;foo=bar;refresher=uac;baz").unwrap();
567 assert_eq!(se.refresher, Some(Refresher::Uac));
568 }
569
570 #[test]
571 fn parse_rejects_garbage() {
572 assert!(SessionExpires::parse("").is_err());
573 assert!(SessionExpires::parse("soon").is_err());
574 assert!(SessionExpires::parse("1800;refresher=bogus").is_err());
575 assert!(SessionExpires::parse("-5").is_err());
576 }
577
578 #[test]
579 fn header_value_round_trips() {
580 for se in [
581 SessionExpires {
582 interval_secs: 1800,
583 refresher: None,
584 },
585 SessionExpires {
586 interval_secs: 90,
587 refresher: Some(Refresher::Uac),
588 },
589 SessionExpires {
590 interval_secs: 7200,
591 refresher: Some(Refresher::Uas),
592 },
593 ] {
594 let parsed = SessionExpires::parse(&se.header_value()).unwrap();
595 assert_eq!(parsed, se, "round-trip via {:?}", se.header_value());
596 }
597 }
598
599 #[test]
600 fn header_builds_untyped_session_expires() {
601 let h = SessionExpires {
602 interval_secs: 1800,
603 refresher: Some(Refresher::Uac),
604 }
605 .header();
606 assert_eq!(h.to_string(), "Session-Expires: 1800;refresher=uac");
607 }
608
609 fn headers(items: Vec<Header>) -> rsip::Headers {
612 let mut h = rsip::Headers::default();
613 for item in items {
614 h.push(item);
615 }
616 h
617 }
618
619 #[test]
620 fn session_expires_in_finds_header_case_insensitively() {
621 let h = headers(vec![Header::Other(
622 "session-expires".into(),
623 "600;refresher=uas".into(),
624 )]);
625 let se = session_expires_in(&h).unwrap();
626 assert_eq!(se.interval_secs, 600);
627 assert_eq!(se.refresher, Some(Refresher::Uas));
628 }
629
630 #[test]
631 fn session_expires_in_accepts_compact_form() {
632 let h = headers(vec![Header::Other("x".into(), "300".into())]);
634 assert_eq!(
635 session_expires_in(&h),
636 Some(SessionExpires {
637 interval_secs: 300,
638 refresher: None
639 })
640 );
641 }
642
643 #[test]
644 fn session_expires_in_absent_or_malformed_is_none() {
645 assert_eq!(session_expires_in(&headers(vec![])), None);
646 let h = headers(vec![Header::Other("Session-Expires".into(), "soon".into())]);
647 assert_eq!(session_expires_in(&h), None);
648 }
649
650 #[test]
651 fn min_se_in_parses_and_ignores_params() {
652 let h = headers(vec![Header::Other("Min-SE".into(), "120".into())]);
653 assert_eq!(min_se_in(&h), Some(120));
654 let h = headers(vec![Header::Other("min-se".into(), "240;lr".into())]);
655 assert_eq!(min_se_in(&h), Some(240));
656 assert_eq!(min_se_in(&headers(vec![])), None);
657 let h = headers(vec![Header::Other("Min-SE".into(), "never".into())]);
658 assert_eq!(min_se_in(&h), None);
659 }
660
661 #[test]
662 fn supports_timer_scans_typed_untyped_and_compact() {
663 assert!(supports_timer(&headers(vec![supported_timer_header()])));
664 assert!(supports_timer(&headers(vec![Header::Supported(
665 "100rel, timer".into()
666 )])));
667 assert!(supports_timer(&headers(vec![Header::Other(
668 "k".into(),
669 "timer".into()
670 )])));
671 assert!(supports_timer(&headers(vec![Header::Other(
672 "Supported".into(),
673 "TIMER".into()
674 )])));
675 assert!(!supports_timer(&headers(vec![])));
676 assert!(!supports_timer(&headers(vec![Header::Supported(
677 "100rel".into()
678 )])));
679 assert!(!supports_timer(&headers(vec![Header::Supported(
681 "timers".into()
682 )])));
683 }
684
685 #[test]
688 fn refresh_fires_at_half_the_interval() {
689 let t = SessionTimer {
690 interval_secs: 1800,
691 we_are_refresher: true,
692 };
693 assert_eq!(t.refresh_after(), Duration::from_secs(900));
694 let t = SessionTimer {
695 interval_secs: 90,
696 we_are_refresher: true,
697 };
698 assert_eq!(t.refresh_after(), Duration::from_secs(45));
699 }
700
701 #[test]
702 fn expiry_keeps_min_of_32s_or_a_third_headroom() {
703 let t = SessionTimer {
705 interval_secs: 1800,
706 we_are_refresher: false,
707 };
708 assert_eq!(t.expiry_after(), Duration::from_secs(1768));
709 let t = SessionTimer {
711 interval_secs: 90,
712 we_are_refresher: false,
713 };
714 assert_eq!(t.expiry_after(), Duration::from_secs(60));
715 }
716
717 #[test]
720 fn uac_no_session_expires_means_no_timer() {
721 assert_eq!(negotiate_uac(&headers(vec![])), None);
722 }
723
724 #[test]
725 fn uac_refresher_uas_means_peer_refreshes() {
726 let h = headers(vec![Header::Other(
727 "Session-Expires".into(),
728 "1800;refresher=uas".into(),
729 )]);
730 assert_eq!(
731 negotiate_uac(&h),
732 Some(SessionTimer {
733 interval_secs: 1800,
734 we_are_refresher: false
735 })
736 );
737 }
738
739 #[test]
740 fn uac_refresher_uac_or_missing_means_we_refresh() {
741 let h = headers(vec![Header::Other(
742 "Session-Expires".into(),
743 "600;refresher=uac".into(),
744 )]);
745 assert!(negotiate_uac(&h).unwrap().we_are_refresher);
746 let h = headers(vec![Header::Other("Session-Expires".into(), "600".into())]);
749 assert!(negotiate_uac(&h).unwrap().we_are_refresher);
750 }
751
752 #[test]
753 fn uac_floors_tiny_grants_at_90s() {
754 let h = headers(vec![Header::Other(
755 "Session-Expires".into(),
756 "20;refresher=uac".into(),
757 )]);
758 assert_eq!(negotiate_uac(&h).unwrap().interval_secs, 90);
759 }
760
761 fn invite_headers(session_expires: &str, min_se: Option<&str>, timer: bool) -> rsip::Headers {
764 let mut items = vec![Header::Other(
765 "Session-Expires".into(),
766 session_expires.into(),
767 )];
768 if let Some(m) = min_se {
769 items.push(Header::Other("Min-SE".into(), m.into()));
770 }
771 if timer {
772 items.push(supported_timer_header());
773 }
774 headers(items)
775 }
776
777 #[test]
778 fn uas_no_session_expires_means_no_timer() {
779 assert_eq!(
780 negotiate_uas(&headers(vec![supported_timer_header()])),
781 None
782 );
783 }
784
785 #[test]
786 fn uas_default_makes_supporting_peer_the_refresher() {
787 let uas = negotiate_uas(&invite_headers("1800", None, true)).unwrap();
788 assert_eq!(uas.timer.interval_secs, 1800);
789 assert!(!uas.timer.we_are_refresher, "peer (UAC) should refresh");
790 assert_eq!(
791 uas.echo,
792 SessionExpires {
793 interval_secs: 1800,
794 refresher: Some(Refresher::Uac)
795 }
796 );
797 assert!(uas.require_timer);
798 }
799
800 #[test]
801 fn uas_honors_requested_refresher_uas() {
802 let uas = negotiate_uas(&invite_headers("1800;refresher=uas", None, true)).unwrap();
803 assert!(uas.timer.we_are_refresher, "we (UAS) were asked to refresh");
804 assert_eq!(uas.echo.refresher, Some(Refresher::Uas));
805 }
806
807 #[test]
808 fn uas_without_peer_support_takes_refresher_role() {
809 let uas = negotiate_uas(&invite_headers("1800;refresher=uac", None, false)).unwrap();
813 assert!(uas.timer.we_are_refresher);
814 assert_eq!(uas.echo.refresher, Some(Refresher::Uas));
815 assert!(!uas.require_timer);
816 }
817
818 #[test]
819 fn uas_floors_interval_at_min_se_and_90s() {
820 let uas = negotiate_uas(&invite_headers("30", Some("120"), true)).unwrap();
822 assert_eq!(uas.timer.interval_secs, 120);
823 assert_eq!(uas.echo.interval_secs, 120);
824 let uas = negotiate_uas(&invite_headers("30", None, true)).unwrap();
826 assert_eq!(uas.timer.interval_secs, 90);
827 }
828
829 #[derive(Debug, Clone, PartialEq, Eq)]
832 enum Event {
833 Refresh { session_expires: String },
834 Bye,
835 }
836
837 struct MockDialog {
840 events: Mutex<Vec<(Duration, Event)>>,
841 refresh_replies: Mutex<Vec<Result<Option<rsip::Response>, String>>>,
842 started: tokio::time::Instant,
843 }
844
845 impl MockDialog {
846 fn new(refresh_replies: Vec<Result<Option<rsip::Response>, String>>) -> Self {
847 Self {
848 events: Mutex::new(Vec::new()),
849 refresh_replies: Mutex::new(refresh_replies),
850 started: tokio::time::Instant::now(),
851 }
852 }
853
854 fn events(&self) -> Vec<(Duration, Event)> {
855 self.events.lock().unwrap().clone()
856 }
857 }
858
859 fn response(code: u16, extra: Vec<Header>) -> rsip::Response {
860 rsip::Response {
861 status_code: rsip::StatusCode::from(code),
862 version: rsip::Version::V2,
863 headers: headers(extra),
864 body: Vec::new(),
865 }
866 }
867
868 impl SessionDialogOps for MockDialog {
869 async fn refresh(
870 &self,
871 hdrs: Vec<Header>,
872 _body: Option<Vec<u8>>,
873 ) -> Result<Option<rsip::Response>, BoxError> {
874 let se = hdrs
875 .iter()
876 .find_map(|h| match h {
877 Header::Other(name, value) if name == "Session-Expires" => Some(value.clone()),
878 _ => None,
879 })
880 .unwrap_or_default();
881 self.events.lock().unwrap().push((
882 self.started.elapsed(),
883 Event::Refresh {
884 session_expires: se,
885 },
886 ));
887 let reply = self.refresh_replies.lock().unwrap().remove(0);
888 reply.map_err(Into::into)
889 }
890
891 async fn send_bye(&self) -> Result<(), BoxError> {
892 self.events
893 .lock()
894 .unwrap()
895 .push((self.started.elapsed(), Event::Bye));
896 Ok(())
897 }
898 }
899
900 fn timer(interval_secs: u32, we_are_refresher: bool) -> SessionTimer {
901 SessionTimer {
902 interval_secs,
903 we_are_refresher,
904 }
905 }
906
907 #[tokio::test(start_paused = true)]
908 async fn refresher_sends_refresh_every_half_interval() {
909 let dialog = Arc::new(MockDialog::new(vec![
910 Ok(Some(response(200, vec![]))),
911 Ok(Some(response(200, vec![]))),
912 Ok(None), ]));
914 let cancel = CancellationToken::new();
915 let outcome = session_timer_loop(
916 &*dialog,
917 timer(180, true),
918 None,
919 Arc::new(Notify::new()),
920 cancel,
921 )
922 .await;
923 assert_eq!(outcome, SessionTimerOutcome::DialogGone);
924
925 let events = dialog.events();
926 assert_eq!(events.len(), 3);
927 assert_eq!(events[0].0, Duration::from_secs(90));
928 assert_eq!(events[1].0, Duration::from_secs(180));
929 assert_eq!(events[2].0, Duration::from_secs(270));
930 for (_, e) in &events {
931 assert_eq!(
932 e,
933 &Event::Refresh {
934 session_expires: "180;refresher=uac".into()
935 }
936 );
937 }
938 }
939
940 #[tokio::test(start_paused = true)]
941 async fn refresher_adopts_interval_granted_in_refresh_response() {
942 let regrant = response(
945 200,
946 vec![Header::Other(
947 "Session-Expires".into(),
948 "360;refresher=uac".into(),
949 )],
950 );
951 let dialog = Arc::new(MockDialog::new(vec![Ok(Some(regrant)), Ok(None)]));
952 let cancel = CancellationToken::new();
953 let outcome = session_timer_loop(
954 &*dialog,
955 timer(180, true),
956 None,
957 Arc::new(Notify::new()),
958 cancel,
959 )
960 .await;
961 assert_eq!(outcome, SessionTimerOutcome::DialogGone);
962
963 let events = dialog.events();
964 assert_eq!(events[0].0, Duration::from_secs(90), "first at 180/2");
965 assert_eq!(
966 events[1].0,
967 Duration::from_secs(90 + 180),
968 "second at 90 + 360/2 after the re-grant"
969 );
970 }
971
972 #[tokio::test(start_paused = true)]
973 async fn refresher_rejected_refresh_sends_bye() {
974 let dialog = Arc::new(MockDialog::new(vec![Ok(Some(response(481, vec![])))]));
975 let cancel = CancellationToken::new();
976 let outcome = session_timer_loop(
977 &*dialog,
978 timer(180, true),
979 None,
980 Arc::new(Notify::new()),
981 cancel,
982 )
983 .await;
984 assert_eq!(outcome, SessionTimerOutcome::RefreshFailed);
985 let events = dialog.events();
986 assert!(matches!(events[0].1, Event::Refresh { .. }));
987 assert_eq!(events[1].1, Event::Bye);
988 }
989
990 #[tokio::test(start_paused = true)]
991 async fn refresher_transport_error_sends_bye() {
992 let dialog = Arc::new(MockDialog::new(vec![Err("socket closed".into())]));
993 let cancel = CancellationToken::new();
994 let outcome = session_timer_loop(
995 &*dialog,
996 timer(180, true),
997 None,
998 Arc::new(Notify::new()),
999 cancel,
1000 )
1001 .await;
1002 assert_eq!(outcome, SessionTimerOutcome::RefreshFailed);
1003 assert_eq!(dialog.events().last().unwrap().1, Event::Bye);
1004 }
1005
1006 #[tokio::test(start_paused = true)]
1007 async fn refresher_cancellation_wins_before_first_refresh() {
1008 let dialog = Arc::new(MockDialog::new(vec![]));
1009 let cancel = CancellationToken::new();
1010 cancel.cancel();
1011 let outcome = session_timer_loop(
1012 &*dialog,
1013 timer(180, true),
1014 None,
1015 Arc::new(Notify::new()),
1016 cancel,
1017 )
1018 .await;
1019 assert_eq!(outcome, SessionTimerOutcome::Cancelled);
1020 assert!(dialog.events().is_empty(), "no refresh, no BYE");
1021 }
1022
1023 #[tokio::test(start_paused = true)]
1024 async fn watchdog_sends_bye_when_session_lapses() {
1025 let dialog = Arc::new(MockDialog::new(vec![]));
1026 let cancel = CancellationToken::new();
1027 let outcome = session_timer_loop(
1028 &*dialog,
1029 timer(90, false),
1030 None,
1031 Arc::new(Notify::new()),
1032 cancel,
1033 )
1034 .await;
1035 assert_eq!(outcome, SessionTimerOutcome::Expired);
1036 let events = dialog.events();
1037 assert_eq!(events.len(), 1);
1038 assert_eq!(events[0], (Duration::from_secs(60), Event::Bye));
1040 }
1041
1042 #[tokio::test(start_paused = true)]
1043 async fn watchdog_resets_deadline_on_peer_refresh() {
1044 let dialog = Arc::new(MockDialog::new(vec![]));
1045 let cancel = CancellationToken::new();
1046 let refreshed = Arc::new(Notify::new());
1047
1048 let loop_task = tokio::spawn({
1049 let dialog = dialog.clone();
1050 let refreshed = refreshed.clone();
1051 let cancel = cancel.clone();
1052 async move { session_timer_loop(&*dialog, timer(90, false), None, refreshed, cancel).await }
1053 });
1054
1055 tokio::time::sleep(Duration::from_secs(59)).await;
1057 refreshed.notify_one();
1058 tokio::task::yield_now().await;
1059 tokio::time::sleep(Duration::from_secs(30)).await;
1061 assert!(dialog.events().is_empty(), "deadline should have reset");
1062 let outcome = loop_task.await.unwrap();
1064 assert_eq!(outcome, SessionTimerOutcome::Expired);
1065 assert_eq!(
1066 dialog.events(),
1067 vec![(Duration::from_secs(119), Event::Bye)]
1068 );
1069 }
1070
1071 #[tokio::test(start_paused = true)]
1072 async fn watchdog_cancellation_stands_down_without_bye() {
1073 let dialog = Arc::new(MockDialog::new(vec![]));
1074 let cancel = CancellationToken::new();
1075 let loop_task = tokio::spawn({
1076 let dialog = dialog.clone();
1077 let cancel = cancel.clone();
1078 async move {
1079 session_timer_loop(
1080 &*dialog,
1081 timer(90, false),
1082 None,
1083 Arc::new(Notify::new()),
1084 cancel,
1085 )
1086 .await
1087 }
1088 });
1089 tokio::time::sleep(Duration::from_secs(10)).await;
1090 cancel.cancel();
1091 assert_eq!(loop_task.await.unwrap(), SessionTimerOutcome::Cancelled);
1092 assert!(dialog.events().is_empty());
1093 }
1094}