1use std::future::Future;
58use std::sync::Arc;
59use std::time::Duration;
60
61use rsip::prelude::UntypedHeader;
62use rsip::Header;
63use tokio::select;
64use tokio::sync::Notify;
65use tokio_util::sync::CancellationToken;
66use tracing::{debug, info, warn};
67
68type BoxError = Box<dyn std::error::Error + Send + Sync>;
69
70pub const MIN_SESSION_EXPIRES_SECS: u32 = 90;
74
75pub const DEFAULT_SESSION_EXPIRES_SECS: u32 = 1800;
78
79const MAX_EXPIRY_HEADROOM_SECS: u32 = 32;
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum Refresher {
87 Uac,
89 Uas,
91}
92
93impl Refresher {
94 pub fn as_str(&self) -> &'static str {
96 match self {
97 Refresher::Uac => "uac",
98 Refresher::Uas => "uas",
99 }
100 }
101
102 fn parse(s: &str) -> Result<Self, String> {
103 if s.eq_ignore_ascii_case("uac") {
104 Ok(Refresher::Uac)
105 } else if s.eq_ignore_ascii_case("uas") {
106 Ok(Refresher::Uas)
107 } else {
108 Err(format!("invalid refresher value {s:?} (want uac|uas)"))
109 }
110 }
111}
112
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub struct SessionExpires {
117 pub interval_secs: u32,
119 pub refresher: Option<Refresher>,
121}
122
123impl SessionExpires {
124 pub fn parse(value: &str) -> Result<Self, String> {
129 let mut parts = value.split(';');
130 let interval = parts.next().unwrap_or("").trim();
131 let interval_secs: u32 = interval
132 .parse()
133 .map_err(|e| format!("invalid Session-Expires interval {interval:?}: {e}"))?;
134 let mut refresher = None;
135 for param in parts {
136 if let Some((name, val)) = param.split_once('=') {
137 if name.trim().eq_ignore_ascii_case("refresher") {
138 refresher = Some(Refresher::parse(val.trim())?);
139 }
140 }
141 }
142 Ok(Self {
143 interval_secs,
144 refresher,
145 })
146 }
147
148 pub fn header_value(&self) -> String {
150 match self.refresher {
151 Some(r) => format!("{};refresher={}", self.interval_secs, r.as_str()),
152 None => self.interval_secs.to_string(),
153 }
154 }
155
156 pub fn header(&self) -> Header {
159 Header::Other("Session-Expires".into(), self.header_value())
160 }
161}
162
163pub fn supported_timer_header() -> Header {
166 Header::Supported("timer".into())
167}
168
169pub fn require_timer_header() -> Header {
172 Header::Require("timer".into())
173}
174
175fn other_header_value<'a>(headers: &'a rsip::Headers, names: &[&str]) -> Option<&'a str> {
178 headers.iter().find_map(|h| match h {
179 Header::Other(name, value) if names.iter().any(|n| name.eq_ignore_ascii_case(n)) => {
180 Some(value.as_str())
181 }
182 _ => None,
183 })
184}
185
186pub fn session_expires_in(headers: &rsip::Headers) -> Option<SessionExpires> {
190 let raw = other_header_value(headers, &["Session-Expires", "x"])?;
191 match SessionExpires::parse(raw) {
192 Ok(se) => Some(se),
193 Err(e) => {
194 warn!("ignoring malformed Session-Expires header: {e}");
195 None
196 }
197 }
198}
199
200pub fn min_se_in(headers: &rsip::Headers) -> Option<u32> {
203 let raw = other_header_value(headers, &["Min-SE"])?;
204 let interval = raw.split(';').next().unwrap_or("").trim();
205 match interval.parse() {
206 Ok(v) => Some(v),
207 Err(e) => {
208 warn!("ignoring malformed Min-SE header {raw:?}: {e}");
209 None
210 }
211 }
212}
213
214fn has_timer_token(value: &str) -> bool {
215 value
216 .split(',')
217 .any(|tag| tag.trim().eq_ignore_ascii_case("timer"))
218}
219
220pub fn supports_timer(headers: &rsip::Headers) -> bool {
223 headers.iter().any(|h| match h {
224 Header::Supported(s) => has_timer_token(s.value()),
225 Header::Other(name, value)
226 if name.eq_ignore_ascii_case("Supported") || name.eq_ignore_ascii_case("k") =>
227 {
228 has_timer_token(value)
229 }
230 _ => false,
231 })
232}
233
234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237pub struct SessionTimer {
238 pub interval_secs: u32,
240 pub we_are_refresher: bool,
243}
244
245impl SessionTimer {
246 pub fn refresh_after(&self) -> Duration {
249 Duration::from_secs(u64::from(self.interval_secs / 2))
250 }
251
252 pub fn expiry_after(&self) -> Duration {
256 let headroom = (self.interval_secs / 3).min(MAX_EXPIRY_HEADROOM_SECS);
257 Duration::from_secs(u64::from(self.interval_secs.saturating_sub(headroom)))
258 }
259}
260
261pub fn negotiate_uac(response_headers: &rsip::Headers) -> Option<SessionTimer> {
274 let se = session_expires_in(response_headers)?;
275 Some(SessionTimer {
276 interval_secs: se.interval_secs.max(MIN_SESSION_EXPIRES_SECS),
277 we_are_refresher: !matches!(se.refresher, Some(Refresher::Uas)),
278 })
279}
280
281#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284pub struct UasSessionTimer {
285 pub timer: SessionTimer,
287 pub echo: SessionExpires,
290 pub require_timer: bool,
293}
294
295pub fn negotiate_uas(invite_headers: &rsip::Headers) -> Option<UasSessionTimer> {
310 let se = session_expires_in(invite_headers)?;
311 let floor = min_se_in(invite_headers)
312 .unwrap_or(0)
313 .max(MIN_SESSION_EXPIRES_SECS);
314 let interval_secs = se.interval_secs.max(floor);
315 let peer_supports = supports_timer(invite_headers);
316 let refresher = if peer_supports {
317 se.refresher.unwrap_or(Refresher::Uac)
318 } else {
319 Refresher::Uas
320 };
321 Some(UasSessionTimer {
322 timer: SessionTimer {
323 interval_secs,
324 we_are_refresher: refresher == Refresher::Uas,
325 },
326 echo: SessionExpires {
327 interval_secs,
328 refresher: Some(refresher),
329 },
330 require_timer: peer_supports,
331 })
332}
333
334pub trait SessionDialogOps {
340 fn refresh(
344 &self,
345 headers: Vec<Header>,
346 body: Option<Vec<u8>>,
347 ) -> impl Future<Output = Result<Option<rsip::Response>, BoxError>> + Send;
348
349 fn send_bye(&self) -> impl Future<Output = Result<(), BoxError>> + Send;
351}
352
353#[derive(Debug, Clone, Copy, PartialEq, Eq)]
355pub enum SessionTimerOutcome {
356 Cancelled,
359 Expired,
362 RefreshFailed,
365 DialogGone,
368}
369
370pub async fn session_timer_loop<D: SessionDialogOps>(
388 dialog: &D,
389 timer: SessionTimer,
390 refresh_body: Option<Vec<u8>>,
391 peer_refreshed: Arc<Notify>,
392 cancel: CancellationToken,
393) -> SessionTimerOutcome {
394 let mut interval_secs = timer.interval_secs.max(MIN_SESSION_EXPIRES_SECS);
395 if timer.we_are_refresher {
396 loop {
397 let current = SessionTimer {
398 interval_secs,
399 we_are_refresher: true,
400 };
401 select! {
402 _ = tokio::time::sleep(current.refresh_after()) => {}
403 _ = cancel.cancelled() => return SessionTimerOutcome::Cancelled,
404 }
405
406 let headers = vec![
409 supported_timer_header(),
410 SessionExpires {
411 interval_secs,
412 refresher: Some(Refresher::Uac),
413 }
414 .header(),
415 ];
416 match dialog.refresh(headers, refresh_body.clone()).await {
417 Ok(Some(resp)) if resp.status_code.kind() == rsip::StatusCodeKind::Successful => {
418 if let Some(granted) = session_expires_in(&resp.headers) {
421 interval_secs = granted.interval_secs.max(MIN_SESSION_EXPIRES_SECS);
422 }
423 debug!(interval_secs, "session refresh accepted");
424 }
425 Ok(Some(resp)) => {
426 warn!(
427 status = %resp.status_code,
428 "session refresh rejected; hanging up"
429 );
430 if let Err(e) = dialog.send_bye().await {
431 warn!("BYE after rejected refresh failed: {e}");
432 }
433 return SessionTimerOutcome::RefreshFailed;
434 }
435 Ok(None) => {
436 debug!("dialog no longer confirmed; session timer standing down");
437 return SessionTimerOutcome::DialogGone;
438 }
439 Err(e) => {
440 warn!("session refresh error: {e}; hanging up");
441 if let Err(e) = dialog.send_bye().await {
442 warn!("BYE after failed refresh failed: {e}");
443 }
444 return SessionTimerOutcome::RefreshFailed;
445 }
446 }
447 }
448 } else {
449 let current = SessionTimer {
450 interval_secs,
451 we_are_refresher: false,
452 };
453 loop {
454 select! {
455 _ = tokio::time::sleep(current.expiry_after()) => {
456 info!(
457 interval_secs,
458 "session lapsed without refresh; sending BYE"
459 );
460 if let Err(e) = dialog.send_bye().await {
461 warn!("BYE after session expiry failed: {e}");
462 }
463 return SessionTimerOutcome::Expired;
464 }
465 _ = peer_refreshed.notified() => {
466 debug!("peer refreshed session; watchdog deadline reset");
467 }
468 _ = cancel.cancelled() => return SessionTimerOutcome::Cancelled,
469 }
470 }
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477 use std::sync::Mutex;
478
479 #[test]
482 fn parse_bare_interval() {
483 let se = SessionExpires::parse("1800").unwrap();
484 assert_eq!(se.interval_secs, 1800);
485 assert_eq!(se.refresher, None);
486 }
487
488 #[test]
489 fn parse_with_refresher_param() {
490 let se = SessionExpires::parse("1800;refresher=uas").unwrap();
491 assert_eq!(se.interval_secs, 1800);
492 assert_eq!(se.refresher, Some(Refresher::Uas));
493 let se = SessionExpires::parse("90;refresher=uac").unwrap();
494 assert_eq!(se.refresher, Some(Refresher::Uac));
495 }
496
497 #[test]
498 fn parse_is_case_insensitive_and_whitespace_tolerant() {
499 let se = SessionExpires::parse(" 600 ; Refresher = UAS ").unwrap();
500 assert_eq!(se.interval_secs, 600);
501 assert_eq!(se.refresher, Some(Refresher::Uas));
502 }
503
504 #[test]
505 fn parse_ignores_unknown_params() {
506 let se = SessionExpires::parse("1800;foo=bar;refresher=uac;baz").unwrap();
507 assert_eq!(se.refresher, Some(Refresher::Uac));
508 }
509
510 #[test]
511 fn parse_rejects_garbage() {
512 assert!(SessionExpires::parse("").is_err());
513 assert!(SessionExpires::parse("soon").is_err());
514 assert!(SessionExpires::parse("1800;refresher=bogus").is_err());
515 assert!(SessionExpires::parse("-5").is_err());
516 }
517
518 #[test]
519 fn header_value_round_trips() {
520 for se in [
521 SessionExpires {
522 interval_secs: 1800,
523 refresher: None,
524 },
525 SessionExpires {
526 interval_secs: 90,
527 refresher: Some(Refresher::Uac),
528 },
529 SessionExpires {
530 interval_secs: 7200,
531 refresher: Some(Refresher::Uas),
532 },
533 ] {
534 let parsed = SessionExpires::parse(&se.header_value()).unwrap();
535 assert_eq!(parsed, se, "round-trip via {:?}", se.header_value());
536 }
537 }
538
539 #[test]
540 fn header_builds_untyped_session_expires() {
541 let h = SessionExpires {
542 interval_secs: 1800,
543 refresher: Some(Refresher::Uac),
544 }
545 .header();
546 assert_eq!(h.to_string(), "Session-Expires: 1800;refresher=uac");
547 }
548
549 fn headers(items: Vec<Header>) -> rsip::Headers {
552 let mut h = rsip::Headers::default();
553 for item in items {
554 h.push(item);
555 }
556 h
557 }
558
559 #[test]
560 fn session_expires_in_finds_header_case_insensitively() {
561 let h = headers(vec![Header::Other(
562 "session-expires".into(),
563 "600;refresher=uas".into(),
564 )]);
565 let se = session_expires_in(&h).unwrap();
566 assert_eq!(se.interval_secs, 600);
567 assert_eq!(se.refresher, Some(Refresher::Uas));
568 }
569
570 #[test]
571 fn session_expires_in_accepts_compact_form() {
572 let h = headers(vec![Header::Other("x".into(), "300".into())]);
574 assert_eq!(
575 session_expires_in(&h),
576 Some(SessionExpires {
577 interval_secs: 300,
578 refresher: None
579 })
580 );
581 }
582
583 #[test]
584 fn session_expires_in_absent_or_malformed_is_none() {
585 assert_eq!(session_expires_in(&headers(vec![])), None);
586 let h = headers(vec![Header::Other("Session-Expires".into(), "soon".into())]);
587 assert_eq!(session_expires_in(&h), None);
588 }
589
590 #[test]
591 fn min_se_in_parses_and_ignores_params() {
592 let h = headers(vec![Header::Other("Min-SE".into(), "120".into())]);
593 assert_eq!(min_se_in(&h), Some(120));
594 let h = headers(vec![Header::Other("min-se".into(), "240;lr".into())]);
595 assert_eq!(min_se_in(&h), Some(240));
596 assert_eq!(min_se_in(&headers(vec![])), None);
597 let h = headers(vec![Header::Other("Min-SE".into(), "never".into())]);
598 assert_eq!(min_se_in(&h), None);
599 }
600
601 #[test]
602 fn supports_timer_scans_typed_untyped_and_compact() {
603 assert!(supports_timer(&headers(vec![supported_timer_header()])));
604 assert!(supports_timer(&headers(vec![Header::Supported(
605 "100rel, timer".into()
606 )])));
607 assert!(supports_timer(&headers(vec![Header::Other(
608 "k".into(),
609 "timer".into()
610 )])));
611 assert!(supports_timer(&headers(vec![Header::Other(
612 "Supported".into(),
613 "TIMER".into()
614 )])));
615 assert!(!supports_timer(&headers(vec![])));
616 assert!(!supports_timer(&headers(vec![Header::Supported(
617 "100rel".into()
618 )])));
619 assert!(!supports_timer(&headers(vec![Header::Supported(
621 "timers".into()
622 )])));
623 }
624
625 #[test]
628 fn refresh_fires_at_half_the_interval() {
629 let t = SessionTimer {
630 interval_secs: 1800,
631 we_are_refresher: true,
632 };
633 assert_eq!(t.refresh_after(), Duration::from_secs(900));
634 let t = SessionTimer {
635 interval_secs: 90,
636 we_are_refresher: true,
637 };
638 assert_eq!(t.refresh_after(), Duration::from_secs(45));
639 }
640
641 #[test]
642 fn expiry_keeps_min_of_32s_or_a_third_headroom() {
643 let t = SessionTimer {
645 interval_secs: 1800,
646 we_are_refresher: false,
647 };
648 assert_eq!(t.expiry_after(), Duration::from_secs(1768));
649 let t = SessionTimer {
651 interval_secs: 90,
652 we_are_refresher: false,
653 };
654 assert_eq!(t.expiry_after(), Duration::from_secs(60));
655 }
656
657 #[test]
660 fn uac_no_session_expires_means_no_timer() {
661 assert_eq!(negotiate_uac(&headers(vec![])), None);
662 }
663
664 #[test]
665 fn uac_refresher_uas_means_peer_refreshes() {
666 let h = headers(vec![Header::Other(
667 "Session-Expires".into(),
668 "1800;refresher=uas".into(),
669 )]);
670 assert_eq!(
671 negotiate_uac(&h),
672 Some(SessionTimer {
673 interval_secs: 1800,
674 we_are_refresher: false
675 })
676 );
677 }
678
679 #[test]
680 fn uac_refresher_uac_or_missing_means_we_refresh() {
681 let h = headers(vec![Header::Other(
682 "Session-Expires".into(),
683 "600;refresher=uac".into(),
684 )]);
685 assert!(negotiate_uac(&h).unwrap().we_are_refresher);
686 let h = headers(vec![Header::Other("Session-Expires".into(), "600".into())]);
689 assert!(negotiate_uac(&h).unwrap().we_are_refresher);
690 }
691
692 #[test]
693 fn uac_floors_tiny_grants_at_90s() {
694 let h = headers(vec![Header::Other(
695 "Session-Expires".into(),
696 "20;refresher=uac".into(),
697 )]);
698 assert_eq!(negotiate_uac(&h).unwrap().interval_secs, 90);
699 }
700
701 fn invite_headers(session_expires: &str, min_se: Option<&str>, timer: bool) -> rsip::Headers {
704 let mut items = vec![Header::Other(
705 "Session-Expires".into(),
706 session_expires.into(),
707 )];
708 if let Some(m) = min_se {
709 items.push(Header::Other("Min-SE".into(), m.into()));
710 }
711 if timer {
712 items.push(supported_timer_header());
713 }
714 headers(items)
715 }
716
717 #[test]
718 fn uas_no_session_expires_means_no_timer() {
719 assert_eq!(
720 negotiate_uas(&headers(vec![supported_timer_header()])),
721 None
722 );
723 }
724
725 #[test]
726 fn uas_default_makes_supporting_peer_the_refresher() {
727 let uas = negotiate_uas(&invite_headers("1800", None, true)).unwrap();
728 assert_eq!(uas.timer.interval_secs, 1800);
729 assert!(!uas.timer.we_are_refresher, "peer (UAC) should refresh");
730 assert_eq!(
731 uas.echo,
732 SessionExpires {
733 interval_secs: 1800,
734 refresher: Some(Refresher::Uac)
735 }
736 );
737 assert!(uas.require_timer);
738 }
739
740 #[test]
741 fn uas_honors_requested_refresher_uas() {
742 let uas = negotiate_uas(&invite_headers("1800;refresher=uas", None, true)).unwrap();
743 assert!(uas.timer.we_are_refresher, "we (UAS) were asked to refresh");
744 assert_eq!(uas.echo.refresher, Some(Refresher::Uas));
745 }
746
747 #[test]
748 fn uas_without_peer_support_takes_refresher_role() {
749 let uas = negotiate_uas(&invite_headers("1800;refresher=uac", None, false)).unwrap();
753 assert!(uas.timer.we_are_refresher);
754 assert_eq!(uas.echo.refresher, Some(Refresher::Uas));
755 assert!(!uas.require_timer);
756 }
757
758 #[test]
759 fn uas_floors_interval_at_min_se_and_90s() {
760 let uas = negotiate_uas(&invite_headers("30", Some("120"), true)).unwrap();
762 assert_eq!(uas.timer.interval_secs, 120);
763 assert_eq!(uas.echo.interval_secs, 120);
764 let uas = negotiate_uas(&invite_headers("30", None, true)).unwrap();
766 assert_eq!(uas.timer.interval_secs, 90);
767 }
768
769 #[derive(Debug, Clone, PartialEq, Eq)]
772 enum Event {
773 Refresh { session_expires: String },
774 Bye,
775 }
776
777 struct MockDialog {
780 events: Mutex<Vec<(Duration, Event)>>,
781 refresh_replies: Mutex<Vec<Result<Option<rsip::Response>, String>>>,
782 started: tokio::time::Instant,
783 }
784
785 impl MockDialog {
786 fn new(refresh_replies: Vec<Result<Option<rsip::Response>, String>>) -> Self {
787 Self {
788 events: Mutex::new(Vec::new()),
789 refresh_replies: Mutex::new(refresh_replies),
790 started: tokio::time::Instant::now(),
791 }
792 }
793
794 fn events(&self) -> Vec<(Duration, Event)> {
795 self.events.lock().unwrap().clone()
796 }
797 }
798
799 fn response(code: u16, extra: Vec<Header>) -> rsip::Response {
800 rsip::Response {
801 status_code: rsip::StatusCode::from(code),
802 version: rsip::Version::V2,
803 headers: headers(extra),
804 body: Vec::new(),
805 }
806 }
807
808 impl SessionDialogOps for MockDialog {
809 async fn refresh(
810 &self,
811 hdrs: Vec<Header>,
812 _body: Option<Vec<u8>>,
813 ) -> Result<Option<rsip::Response>, BoxError> {
814 let se = hdrs
815 .iter()
816 .find_map(|h| match h {
817 Header::Other(name, value) if name == "Session-Expires" => Some(value.clone()),
818 _ => None,
819 })
820 .unwrap_or_default();
821 self.events.lock().unwrap().push((
822 self.started.elapsed(),
823 Event::Refresh {
824 session_expires: se,
825 },
826 ));
827 let reply = self.refresh_replies.lock().unwrap().remove(0);
828 reply.map_err(Into::into)
829 }
830
831 async fn send_bye(&self) -> Result<(), BoxError> {
832 self.events
833 .lock()
834 .unwrap()
835 .push((self.started.elapsed(), Event::Bye));
836 Ok(())
837 }
838 }
839
840 fn timer(interval_secs: u32, we_are_refresher: bool) -> SessionTimer {
841 SessionTimer {
842 interval_secs,
843 we_are_refresher,
844 }
845 }
846
847 #[tokio::test(start_paused = true)]
848 async fn refresher_sends_refresh_every_half_interval() {
849 let dialog = Arc::new(MockDialog::new(vec![
850 Ok(Some(response(200, vec![]))),
851 Ok(Some(response(200, vec![]))),
852 Ok(None), ]));
854 let cancel = CancellationToken::new();
855 let outcome = session_timer_loop(
856 &*dialog,
857 timer(180, true),
858 None,
859 Arc::new(Notify::new()),
860 cancel,
861 )
862 .await;
863 assert_eq!(outcome, SessionTimerOutcome::DialogGone);
864
865 let events = dialog.events();
866 assert_eq!(events.len(), 3);
867 assert_eq!(events[0].0, Duration::from_secs(90));
868 assert_eq!(events[1].0, Duration::from_secs(180));
869 assert_eq!(events[2].0, Duration::from_secs(270));
870 for (_, e) in &events {
871 assert_eq!(
872 e,
873 &Event::Refresh {
874 session_expires: "180;refresher=uac".into()
875 }
876 );
877 }
878 }
879
880 #[tokio::test(start_paused = true)]
881 async fn refresher_adopts_interval_granted_in_refresh_response() {
882 let regrant = response(
885 200,
886 vec![Header::Other(
887 "Session-Expires".into(),
888 "360;refresher=uac".into(),
889 )],
890 );
891 let dialog = Arc::new(MockDialog::new(vec![Ok(Some(regrant)), Ok(None)]));
892 let cancel = CancellationToken::new();
893 let outcome = session_timer_loop(
894 &*dialog,
895 timer(180, true),
896 None,
897 Arc::new(Notify::new()),
898 cancel,
899 )
900 .await;
901 assert_eq!(outcome, SessionTimerOutcome::DialogGone);
902
903 let events = dialog.events();
904 assert_eq!(events[0].0, Duration::from_secs(90), "first at 180/2");
905 assert_eq!(
906 events[1].0,
907 Duration::from_secs(90 + 180),
908 "second at 90 + 360/2 after the re-grant"
909 );
910 }
911
912 #[tokio::test(start_paused = true)]
913 async fn refresher_rejected_refresh_sends_bye() {
914 let dialog = Arc::new(MockDialog::new(vec![Ok(Some(response(481, vec![])))]));
915 let cancel = CancellationToken::new();
916 let outcome = session_timer_loop(
917 &*dialog,
918 timer(180, true),
919 None,
920 Arc::new(Notify::new()),
921 cancel,
922 )
923 .await;
924 assert_eq!(outcome, SessionTimerOutcome::RefreshFailed);
925 let events = dialog.events();
926 assert!(matches!(events[0].1, Event::Refresh { .. }));
927 assert_eq!(events[1].1, Event::Bye);
928 }
929
930 #[tokio::test(start_paused = true)]
931 async fn refresher_transport_error_sends_bye() {
932 let dialog = Arc::new(MockDialog::new(vec![Err("socket closed".into())]));
933 let cancel = CancellationToken::new();
934 let outcome = session_timer_loop(
935 &*dialog,
936 timer(180, true),
937 None,
938 Arc::new(Notify::new()),
939 cancel,
940 )
941 .await;
942 assert_eq!(outcome, SessionTimerOutcome::RefreshFailed);
943 assert_eq!(dialog.events().last().unwrap().1, Event::Bye);
944 }
945
946 #[tokio::test(start_paused = true)]
947 async fn refresher_cancellation_wins_before_first_refresh() {
948 let dialog = Arc::new(MockDialog::new(vec![]));
949 let cancel = CancellationToken::new();
950 cancel.cancel();
951 let outcome = session_timer_loop(
952 &*dialog,
953 timer(180, true),
954 None,
955 Arc::new(Notify::new()),
956 cancel,
957 )
958 .await;
959 assert_eq!(outcome, SessionTimerOutcome::Cancelled);
960 assert!(dialog.events().is_empty(), "no refresh, no BYE");
961 }
962
963 #[tokio::test(start_paused = true)]
964 async fn watchdog_sends_bye_when_session_lapses() {
965 let dialog = Arc::new(MockDialog::new(vec![]));
966 let cancel = CancellationToken::new();
967 let outcome = session_timer_loop(
968 &*dialog,
969 timer(90, false),
970 None,
971 Arc::new(Notify::new()),
972 cancel,
973 )
974 .await;
975 assert_eq!(outcome, SessionTimerOutcome::Expired);
976 let events = dialog.events();
977 assert_eq!(events.len(), 1);
978 assert_eq!(events[0], (Duration::from_secs(60), Event::Bye));
980 }
981
982 #[tokio::test(start_paused = true)]
983 async fn watchdog_resets_deadline_on_peer_refresh() {
984 let dialog = Arc::new(MockDialog::new(vec![]));
985 let cancel = CancellationToken::new();
986 let refreshed = Arc::new(Notify::new());
987
988 let loop_task = tokio::spawn({
989 let dialog = dialog.clone();
990 let refreshed = refreshed.clone();
991 let cancel = cancel.clone();
992 async move { session_timer_loop(&*dialog, timer(90, false), None, refreshed, cancel).await }
993 });
994
995 tokio::time::sleep(Duration::from_secs(59)).await;
997 refreshed.notify_one();
998 tokio::task::yield_now().await;
999 tokio::time::sleep(Duration::from_secs(30)).await;
1001 assert!(dialog.events().is_empty(), "deadline should have reset");
1002 let outcome = loop_task.await.unwrap();
1004 assert_eq!(outcome, SessionTimerOutcome::Expired);
1005 assert_eq!(
1006 dialog.events(),
1007 vec![(Duration::from_secs(119), Event::Bye)]
1008 );
1009 }
1010
1011 #[tokio::test(start_paused = true)]
1012 async fn watchdog_cancellation_stands_down_without_bye() {
1013 let dialog = Arc::new(MockDialog::new(vec![]));
1014 let cancel = CancellationToken::new();
1015 let loop_task = tokio::spawn({
1016 let dialog = dialog.clone();
1017 let cancel = cancel.clone();
1018 async move {
1019 session_timer_loop(
1020 &*dialog,
1021 timer(90, false),
1022 None,
1023 Arc::new(Notify::new()),
1024 cancel,
1025 )
1026 .await
1027 }
1028 });
1029 tokio::time::sleep(Duration::from_secs(10)).await;
1030 cancel.cancel();
1031 assert_eq!(loop_task.await.unwrap(), SessionTimerOutcome::Cancelled);
1032 assert!(dialog.events().is_empty());
1033 }
1034}